Benjamin
07/02/2020, 7:30 PMflow.run(executor=executor)
but not with flow.register()
.
We're using using the FargateCluster to create a new cluster for the flow and it's setup properyl with flow.register but no processing is done. I had asked this before a couple of days ago here but now I have a reproducible exemple.
I'll provide the details in this thread.
Thanks a lotBob Colner
07/02/2020, 9:58 PMGreg Desmarais
07/02/2020, 10:55 PM@task(log_stdout=True)
def say_hello(name):
print(f'{datetime.now()}: workflow hello {name}', flush=True)
worker = get_worker()
return f'done on {worker.name}, scheduler at {worker.scheduler.address}'
name = Parameter('name')
with Flow("Simple parallel hello") as flow:
# Since there is no return value dependency, we end up with possible parallel operations
for i in range(10):
say_hello(name)
If I run the flow from my script, targeting a particular Dask cluster, I can hit the right Dask workers:
executor = DaskExecutor(address=dask_scheduler_address)
flow.run(executor=executor)
My question is about registering this flow and running it, say, from the prefect ui. I can easily register the flow with:
flow.register()
But then trying to run it from the ui just hangs. I'm pretty sure it is because the executor isn't registered with the flow. Am I missing something? Thanks in advance...Adrien Boutreau
07/03/2020, 1:03 PMVitor Avancini
07/03/2020, 2:53 PMBob Colner
07/03/2020, 4:13 PMBob Colner
07/03/2020, 4:51 PMclass ParquetSerializer(Serializer):
def serialize(self, value: pd.DataFrame) -> bytes:
# transform a Python object into bytes
tmp_filename = str(time_ns()) + '.parquet'
value.to_parquet(
path=tmp_filename,
index=False
)
with open(tmp_filename, 'rb') as in_file:
df_bytes = in_file.read()
Path(tmp_filename).unlink()
return df_bytes
def deserialize(self, value:bytes) -> pd.DataFrame:
# recover a Python object from bytes
df_bytes_io = BytesIO(value)
df = pd.read_parquet(df_bytes_io)
return df
Does anyone have thoughts about the above approach? (saving as a local file then reading the bytes from the file?)Bob Colner
07/03/2020, 5:11 PMLuke Orland
07/03/2020, 7:38 PMnuks
07/04/2020, 5:26 AMnuks
07/04/2020, 5:37 AMBrad
07/04/2020, 11:43 PMextra_docker_kwargs
to CreateContainer
https://github.com/PrefectHQ/prefect/pull/2915nuks
07/05/2020, 12:05 AMKostas Chalikias
07/06/2020, 1:22 PMKai Weber
07/06/2020, 1:50 PMBen Fogelson
07/06/2020, 4:59 PMflow.replace
, which seems to have to do with the fact that flow.remove(task)
doesn’t remove task
from `flow.slugs`:
from prefect import Parameter, Flow, Task
p1 = Parameter('p')
p2 = Parameter('p')
t1 = Task()
t2 = Task()
flow = Flow('flow')
flow.add_task(p1)
flow.add_task(t1)
print(flow.slugs)
# {<Parameter: p>: 'p', <Task: Task>: 'Task-1'}
flow.replace(t1, t2)
print(flow.slugs)
# {<Parameter: p>: 'p', <Task: Task>: 'Task-1', <Task: Task>: 'Task-1'}
flow.replace(p1, p2)
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-7-5ec138031eb6> in <module>
----> 1 flow.replace(p1, p2)
/opt/conda/envs/drugdiscovery/lib/python3.6/site-packages/prefect/core/flow.py in replace(self, old, new, validate)
291 # update tasks
292 self.tasks.remove(old)
--> 293 self.add_task(new)
294
295 self._cache.clear()
/opt/conda/envs/drugdiscovery/lib/python3.6/site-packages/prefect/core/flow.py in add_task(self, task)
482 raise ValueError(
483 'A task with the slug "{}" already exists in this '
--> 484 "flow.".format(task.slug)
485 )
486 self.slugs[task] = task.slug or self._generate_task_slug(task)
ValueError: A task with the slug "p" already exists in this flow.
Marwan Sarieddine
07/06/2020, 5:17 PMfrom prefect.engine import signals
@task
def signal_task(run_task: bool):
if not run_task:
raise signals.SKIP()
james.lamb
07/06/2020, 6:23 PMHow should I handle prefect version differences between the environment where a flow is created and registered (So, concretely, let's say that I have an agent running using the image) and the environment where the agent runs?flow.register()
prefecthq/prefect:0.12.1-python3.7
. I have some flows already being run by that agent, and those flows were created with prefect
0.12.1.
Imagine that prefect
0.13.0 has just been released on PyPi, and data scientists on my team are going to pip install prefect
, create flows, and register them with flow.register()
What should I do?
1. Add a label to each agent with the prefect major + minor version (e.g. v0.12, v0.13). Make sure flows are registered with such a version label, so 0.12 flows run on the 0.12 agent and 0.13 flows run on the 0.13 agent. prefect
uses semantic versioning, which means that there can be breaking changes between minor releases in the 0.x
series.
2. Do nothing. a flow created with 0.13.x should be expected to work with an agent running 0.12.x
3. Upgrade all those 0.12 flows to 0.13. Prefect Cloud is only ever running a single version of prefect
and if your flows + agent are not in sync with that version, bad things will happen.
4. something else
Thanks for your time and consideration!Bob Colner
07/06/2020, 6:40 PMitertools.product
function to map over multiple tasks.
However, I can't get this to work in a simple example:
import itertools
from prefect import Flow, task
from prefect.engine.executors import LocalDaskExecutor
@task(checkpoint=False)
def cross_product(x, y) -> list:
return list(itertools.product(x, y))
@task(checkpoint=False)
def concat(a:str, b:str) -> str:
return a + b
a = ['d', 'o', 'g']
b = ['c', 'a', 't']
with Flow(name='zip-map-test') as flow:
result = concat.map(cross_product(a, b))
executor = LocalDaskExecutor(scheduler='threads')
flow_state = flow.run(executor)
Matt Allen
07/06/2020, 7:19 PMprefect server start
, the one that runs prefect agent docker
, and the one that runs the script that registers the flow, all with no luckalex
07/06/2020, 11:24 PMrun
method, calling run on a parent source calls the run
method for each of the children. Currently, all the top-level sources are called sequentially and then finally aggregated, does anyone have any ideas on the most "prefectic" way to structure this to maximize parallelization and conciseness?
One way is to define a task for each highlevel source, but this seems to be a bit tedious. We also won't be able to parallelize the children. Another approach I tried is something like this.
@task
def run_for_child(child):
child.run(max_items=50)
return child.collection_name
@task
def run_source(source):
cond = source.is_parent
with case(cond, True):
res1 = run_for_child.map(source.children)
with case(cond, False):
source.run(max_items=50)
res2 = source.collection_name
return merge(res1, res2)
@task
def get_highlevel_sources(config):
# return list[Source] based off config
# In the flow, get the sources and do runsource.map. At the end, aggregate all collections
This gave me a could not infer active flow context
and I'm not sure if this is the best way to structure this anywaysBrett Naul
07/07/2020, 12:25 AMAvi A
07/07/2020, 9:05 AMResult
and I just want to run task C. In Luigi, C would ask for B’s output, and only fetch that. However, in prefect, the flow runner will fetch ALL the results of the mapped tasks A, but since we already have B’s result, that’s totally redundant and wastes a lot of time and data exchange.
Any idea on how to tackle this issue? i.e. fetch only task B results somehow so that C can run?Ankit
07/07/2020, 12:04 PMFrancisco
07/07/2020, 2:40 PMalex
07/07/2020, 4:27 PMwith Flow("My flow", schedule=schedule) as flow:
for source in all_sources:
source.bind(
## parameters for my run() function here
)
res = [source() for feed in all_sources]
In the schematic and gantt chart I have a dag with all the sources
leading to a list, which is expected but I also duplicated source
tasks without any edgesitay livni
07/07/2020, 5:40 PMwith Flow("hello"):
some_lst = make_lst()
with case(some_lst.not_(), False):
# do something
Zach
07/07/2020, 6:16 PMifelse
fn in prefect.
I have a task, we can call it Task A, that queries my database for a value and returns it. If Task A returns some value, then that value should get passed to Task B, whose output gets passed to Task C, whose output goes to Task D.
But if Task A returns None
, then I don't want Task B/C/D to run.
Right now I have it set up like this:
TASK_A = query_database_task()
RUN_TASKS_B_C_d = dummy_task() # task does nothing
ifelse()is_null(TASK_A), RUN_TASKS_B_C_d, None)
TASK_B = some_task_b(TASK_A)
TASK_B.set_upstream(RUN_TASKS_B_C_d)
TASK_C = some_task_b(TASK_B)
TASK_C.set_upstream(RUN_TASKS_B_C_d)
TASK_D = some_task_b(TASK_C)
TASK_D.set_upstream(RUN_TASKS_B_C_d)
There has to be a better way to do thisBob Colner
07/07/2020, 9:18 PMBrett Naul
07/07/2020, 9:19 PMtask.map(sequence, x=unmapped(some_large_result))
a bad idea? we're seeing UserWarning: Large object of size 874.55 MB detected in task graph
and very very slow creation of mapped tasks; I would have expected that these would be re-using the dask futures already present on the cluster, not rehydrating the outputs inside the FlowRunner
loop, but I guess that's not how it works. is this expected behavior and if so is there any known workaround..?Brett Naul
07/07/2020, 9:19 PMtask.map(sequence, x=unmapped(some_large_result))
a bad idea? we're seeing UserWarning: Large object of size 874.55 MB detected in task graph
and very very slow creation of mapped tasks; I would have expected that these would be re-using the dask futures already present on the cluster, not rehydrating the outputs inside the FlowRunner
loop, but I guess that's not how it works. is this expected behavior and if so is there any known workaround..?Jim Crist-Harif
07/07/2020, 9:20 PMsome_large_result
is not the result of a task (e.g. if it's a constant).
If it is the result of a task, we should be able to make the efficient, but the flow runner might be doing something dumb.Brett Naul
07/07/2020, 9:24 PM.submit
ting the actual result bytes for every taskJim Crist-Harif
07/07/2020, 9:26 PMBrett Naul
07/07/2020, 9:27 PMJim Crist-Harif
07/07/2020, 9:52 PM