Kristoffer Sahlin
10/15/2019, 10:23 AMJeff Yun
10/15/2019, 1:46 PMtask_runner.get_task_inputs(state, upstream_states)
but without having to run the flow)?dmo
10/16/2019, 1:49 PMdef get_answer_to_everything():
return 42
And then I define my flow in one_of_many_flows.py:
import my_utility
from prefect import Flow, task
@task
def run_imported_function():
return my_utility.get_answer_to_everything()
with Flow("Imported function") as flow1:
run_imported_function()
flow1.run() # 42
flow1.run(executor=DaskExecutor(address=IP)) # ModuleNotFoundError: No module named 'my_utility'
I know this is due to how the serialization works in Python. However, Dask provides a way to upload files to workers through Executor.upload_file:
https://stackoverflow.com/questions/39295200/can-i-use-functions-imported-from-py-files-in-dask-distributed
However, I have found no way to perform anything similar through the Prefect API. Am I looking in the wrong place or is this use case out of scope?Chris Eberly
10/16/2019, 5:12 PMitay livni
10/19/2019, 1:42 AMitay livni
10/21/2019, 6:47 PMJohn Faucett
10/21/2019, 7:09 PMJohn Faucett
10/21/2019, 7:10 PMAn Hoang
10/21/2019, 9:47 PM[2019-10-21 21:17:43,141] INFO - prefect.FlowRunner | Beginning Flow run for 'dask-example'
[2019-10-21 21:17:43,144] INFO - prefect.FlowRunner | Starting flow run.
[2019-10-21 21:17:43,145] ERROR - prefect.FlowRunner | Unexpected error: ValueError("Unexpected keyword arguments: ['processes', 'silence_logs']")
Traceback (most recent call last):
File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 393, in get_flow_run_state
with executor.start():
File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/contextlib.py", line 112, in __enter__
return next(self.gen)
File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 74, in start
self.address, processes=self.local_processes, **self.kwargs
File "/lab/corradin_data/FOR_AN/anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/distributed/client.py", line 649, in __init__
"Unexpected keyword arguments: {}".format(str(sorted(kwargs)))
ValueError: Unexpected keyword arguments: ['processes', 'silence_logs']
[2019-10-21 21:17:43,149] ERROR - prefect.Flow: dask-example | Unexpected error occured in FlowRunner: ValueError("Unexpected keyword arguments: ['processes', 'silence_logs']")
conda versions:
# Name Version Build Channel
dask 2.6.0 py_0 conda-forge
dask-core 2.6.0 py_0 conda-forge
dask-glm 0.2.0 py_1 conda-forge
dask-jobqueue 0.6.3 py_0 conda-forge
dask-ml 1.0.0 py_1 conda-forge
# Name Version Build Channel
prefect 0.6.6 py_0 conda-forge
cluster configuration:
python
from dask_jobqueue import LSFCluster
cluster = LSFCluster(queue='all_corradin',
cores= 47,
#processes = 2,
walltime='5000:00',
memory='250GB',
local_directory = "/tmp",
job_extra=['-o /dev/null', '-e /dev/null'],
scheduler_port = 8786,
worker_dashboard_address = 8788
)
cluster.scale(17)
Argemiro Neto
10/22/2019, 12:05 AMtask
being mapped with the following signature:
@task(max_retries=3, retry_delay=timedelta(seconds=5))
def run_sync_loads(config: dict) -> dict:
The `task`'s results are being sent to the following `task`:
@task(max_retries=3, retry_delay=timedelta(seconds=5), trigger=triggers.always_run)
def save_model_config(loads: list, configs: list):
Both are reference tasks to the flow
. The problem is that one of the mapped tasks is failing but not being retried and I have no indication that some tasks failed at the end of the flow. To be precise, the logs show that one retry is being called but the task should fail again. It's not failing, even completing. Any ideas on why this is happening?itay livni
10/22/2019, 3:46 AMMark McDonald
10/22/2019, 5:06 PMwith Flow('test flow') as f:
data = get_data()
cleansed_data = munge_data(data)
send_data = email_data(cleansed_data)
alert = send_error_email()
alert.set_upstream(send_data)
f.set_reference_tasks([send_data])
I want my alert
task to trigger if any of the tasks above fail. So, in my task definition I have this set:
@task(trigger = prefect.triggers.any_failed)
def send_error_email():
When the flow runs the first three tasks complete successfully, however and alert task fails, because the trigger fails. (TRIGGERFAIL signal raised during execution.)
However, the overall Flow status is a success because I've set the flow's reference task to the send_data
task
Is this the correct way to use the trigger of "any_failed". It seems to me that it might be more natural for the alert task to be in status of "skipped" if none of the upstream tasks failed.An Hoang
10/22/2019, 5:18 PMMatt Harvey
10/22/2019, 7:47 PMAdam Roderick
10/22/2019, 8:08 PMAdam Roderick
10/22/2019, 8:08 PMAdam Roderick
10/22/2019, 8:08 PMitay livni
10/22/2019, 8:36 PMitay livni
10/22/2019, 8:37 PMifelse
? Thanksitay livni
10/24/2019, 4:25 AMpamdas dataframes
However the merge
fails with a pandas error
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
Without the merge
command both return valid dataframes
. Any suggestions?Mark McDonald
10/24/2019, 8:15 PMdependencies
2. Installs these dependencies on my docker image
3. Builds prefect docker storage object:
storage = Docker(registry_url=os.getenv('PREFECT_ECR_REGISTRY'), image_name=os.getenv('PREFECT_ECR_IMAGE_NAME'),
image_tag=package_version, python_dependencies=dependencies)
4. script imports each of my flow files and locates the flow objects within them. These flows get added to the storage object
5. calls storage.build
6. iterates through the flows and calls flow.deploy() on each flow object with build set to False
As it stands the deployment takes ~5 minutes. Any areas where I might be able to improve this?itay livni
10/24/2019, 8:36 PMfilepath
to visualize? Using the script from the visualize docs I added filename as an arg. But I do not see the write to s3. I am assuming that s3fs is available for writing this file (if that is the right terminology?)Daniel Veenstra
10/24/2019, 10:05 PMEgbert Ypma
10/25/2019, 3:03 PMBrian Mesick
10/25/2019, 5:44 PMitay livni
10/26/2019, 5:53 AMtask
completed do something. Something like TaskRunner
state.is_successful()
but in a flow
.Matias
10/26/2019, 9:30 AMosuarez
10/29/2019, 9:14 AMAdam Roderick
10/29/2019, 11:12 AMflow.run()
locally, I see all the stdout information from these subprocesses in the prefect log output. But I do not see the same information in prefect cloud. Is there a configuration setting or something I can update to ensure I see this output in the cloud logs?Arlo Bryer
10/29/2019, 3:58 PM