Pedro Machado
10/13/2020, 11:16 PMKrzysztof Nawara
10/14/2020, 8:05 AMAlberto de Santos
10/14/2020, 8:50 AMas
10/14/2020, 11:43 AMRaphaël Riel
10/14/2020, 12:44 PMthe_task.map(list_of_ints)
. The Task will have to Map >10 items.
When A) running the flow directly from within the .py file using flow.run()
AND B) setting executor=LocalDaskExecutor()
I’m able to have it use more than 1 thread.
But as soon as I try to run this Flow in an Agent OR if I remove the Dask Executor (While executing the .py file directly), I can’t make it run in parallel!
Recap:
1. Execute the flow from .py file WITH executor=LocalDaskExecutor()
= Works
2. Execute the flow from .py file with “default” Executor = Nope
3. Any combinaison of executor running in an agent = Nope
Any suggestion will be welcome! Thanks.Jeff Brainerd
10/14/2020, 12:54 PM{
"graphQLErrors": [
{
"path": [
"flow_run"
],
"message": "Operation timed out",
"extensions": {
"code": "API_ERROR"
}
}
],
"networkError": null,
"message": "GraphQL error: Operation timed out"
}
Aaron Y
10/14/2020, 3:57 PMfrom ... import ??
def test_say_hello():
assert say_hello() == True
PostProcessingTemplate/
PostProcessingFlow.py
PostProcessingTasks.py
tests/
test_PostProcessingTasks.py
Krzysztof Nawara
10/14/2020, 5:54 PMTom Augspurger
10/14/2020, 5:56 PMDaskKubernetesEnvironment
environment with a custom scheduler_spec_file
/ worker_spec_file
, and GitHub
storage together?
For pangeo-forge, we don't want our users to worry about things like storage / execution environments if they don't need to, so we provide a default: https://github.com/pangeo-forge/pangeo-forge/pull/14/files#diff-467822c6f6378f68bea635c429827a2caf36c7f16cb25944cc7b5146262cf35aR32-R68. Users just write their pipeline and push it to GitHub (e.g. https://github.com/TomAugspurger/example-pipeline/blob/main/recipe/pipeline.py#L30-L41).
When I register and run a flow with this setup, I notice that my custom spec files aren't being used (defined at https://github.com/pangeo-forge/pangeo-forge/pull/14/files#diff-267b30d97c826b0afcae2110fe8ca4acfe6f35a6321d80f5fcc74ea9b7547fc0). We just need to update the ServiceAccount
to be pangeo-forge
rather than default
. So my questions would be:
1. Is it common to use DaskKubernetesEnvironment and GitHub storage, rather than Docker storage?
2. Any suggestions on debugging why my custom spec files aren't being used? When I used Docker
storage they were used (but I've changed other things too).kkkkkkkMitchell Bregman
10/14/2020, 6:35 PMpip install
some internal dependencies that live within a private pypi registry?Alberto de Santos
10/14/2020, 7:51 PMAlberto de Santos
10/14/2020, 7:51 PM'azure-flow-storage', 'gcs-flow-storage', 's3-flow-storage',
'github-flow-storage', 'webhook-flow-storage'
Even when setting up the TOML file with labels, they still appear.Robin
10/14/2020, 9:55 PMflow.run_config
instead of flow.environment
because the DEBUG logging on AWS EKS did not work with flow.environment
.
The good news: DEBUG level logging now works!
The bad news: the tasks are not executed parallely, even after setting flow.executor = DaskExecutor()
Do you know how to enable parallelization?Robin
10/14/2020, 9:59 PMprefect run flow --name "e3dc-all_tasks-flow" --project "eks_test_01" -ps '{"system_id_index_range": [0, 200]}'
However, prefect suggests this command also for windows.
Not a big issue for us, but for some users it might be ...Ognjen Nikolic
10/14/2020, 11:01 PMcluster = FargateCluster(
image="<http://xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/prefect-dask:latest|xxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/prefect-dask:latest>",
scheduler_cpu=1024,
scheduler_mem=4096,
worker_cpu=256,
worker_mem=512,
cloudwatch_logs_group="prefect-dask-test",
task_role_policies=['arn:aws:iam::aws:policy/AmazonS3FullAccess',
'arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryFullAccess']
)
# Prefect task definitions
create_container = CreateContainer(
image_name="ubuntu",
command="ls",
volumes=['/var/run/docker.sock', '/var/run/docker.sock']
)
start = StartContainer()
logs = GetContainerLogs()
wait = WaitOnContainer()
# Prefect Flow definition
with Flow("Prefect Test Workflow") as flow:
container_id = create_container()
start = start(container_id=container_id)
wait = wait(container_id=container_id)
logs = logs(container_id=container_id)
logs.set_upstream(wait)
state = flow.run(executor=DaskExecutor(address=cluster.scheduler_address))
Error encountered:
[2020-10-14 22:45:03] INFO - prefect.TaskRunner | Task 'CreateContainer': Starting task run...
[2020-10-14 22:45:03] ERROR - prefect.TaskRunner | Unexpected error: DockerException("Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))")
Any help would be greatly appreciated for how to correctly run Docker images as tasks within Prefect on FargateCluster.Isaac Brodsky
10/14/2020, 11:38 PMRob Fowler
10/15/2020, 2:24 AMI get: | Flow run FAILED: some reference tasks failed on account of it never scheduling one of the slow_task workers.If there is anything but no load on the machine it fails. If the machine is 1% CPU it works.# python slow.py --range=10 --sleep_time=3
from time import sleep
import argparse
from prefect import Flow, Parameter, unmapped, task, context
from prefect.engine.executors import LocalDaskExecutor
@task(timeout=9)
def slow_task(opts, item, scripts):
logger = context.get('logger')
<http://logger.info|logger.info>(f"==== IN TASK {item} Sleeping {opts.sleep_time}")
sleep(opts.sleep_time)
<http://logger.info|logger.info>(f"## Awake {item}")
return item
@task
def produce_range(opts):
return range(opts.range)
with Flow("PS Version") as flow:
scripts = Parameter('scripts')
opts = Parameter('opts')
nrange = produce_range(opts)
results = slow_task.map(item=nrange,
scripts=unmapped(scripts),
opts=unmapped(opts))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='test pywinrm')
parser.add_argument('--workers', type=int, default=10)
parser.add_argument('--sleep_time', type=int, default=2)
parser.add_argument('--range', type=int, default=10)
opts = parser.parse_args()
executor = LocalDaskExecutor(num_workers=opts.workers)
flow.run(executor=executor,
scripts="hello",
opts=opts)
Rob Fowler
10/15/2020, 7:55 AMYanghui Ou
10/15/2020, 12:44 PMclass GenerateFile( Task ):
def run( self ):
with open( 'result.txt', 'w' ) as f:
f.write( f'This file is generated by {self.name}.' )
class ProcessFile( Task ):
def run( self ):
with open( 'result.txt', 'r' ) as f:
print( f.read() )
gen_task = GenerateFile()
print_task = PrintFile()
with Flow( 'test caching' ) as flow:
gen_result = gen_task()
print_result = print_task( upstream_tasks=[ gen_result ] )
Is there a better way to do it other than manually set the upstream_tasks
?
Another question is how can I specify the generated file as target such that I get the same caching behavior as make? I tried
gen_task = GenerateFile( target='result.txt', checkpoint=True, result=LocalResult( dir='.' ) )
but it does not seem to work.Marwan Sarieddine
10/15/2020, 1:19 PMraphBL
10/15/2020, 1:54 PMNewskooler
10/15/2020, 1:57 PMNewskooler
10/15/2020, 2:21 PMAlberto de Santos
10/15/2020, 4:15 PMAlexander
10/15/2020, 8:29 PMdocker run
this image and call flows registration script (written myself)
3. In this script, i iterate over all python scripts in flows folder; import this scripts instead of exec approach used in extract_flow_from_file or whatever; put its flow object into a list
4. Create docker storage with desired settings - it uses same production environment dockerfile which used in step #1; add all flows to this storage
5. Build this storage
6. Assign built storage object to all flows
7. Register all flows. I am lucky that all flows are in the same project and have same registration settings (for now). It will be painful to come up with approach to how do per-flow registration customization in such generic script
All this required significant experimentation, prefect source code reading (it is magnificent, no jokes. I had a real pleasure reading it). I wish there were best practices put in prefect docs about flow registration and production CI setup.
I was curious what are best practices in your prefect community for production flows registration? What your best choice of running tasks? How do you deliver flows source code to prod?Alexander
10/15/2020, 8:33 PMjosh
10/15/2020, 9:15 PM0.13.11
has been released and here are a few notable changes:
🕒 Per clock labels in schedules
📦 Gitlab storage
⬆️ Auto-upload flow scripts to S3 and GCS
🪲 Several bug fixes
🎃 So many Hacktoberfest contributors
A big thank you to our contributors who helped out with this release! Full changelog:Robin
10/15/2020, 10:30 PMcoiled.Cloud()
explicitly? Or change the coiled config path in the docker image? 🙂
For further context, see failed task id:
https://cloud.prefect.io/accure/flow-run/935509e3-a3d5-41db-8cb4-0e9ca36ff56a?logId=e0a0cc4e-1f70-46df-b4cd-078c67082ac3
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/executors/dask.py", line 260, in start
with self.cluster_class(**self.cluster_kwargs) as cluster: # type: ignore
File "/usr/local/lib/python3.8/site-packages/coiled/cluster.py", line 105, in __init__
self.cloud = cloud or Cloud.current(asynchronous=asynchronous)
File "/usr/local/lib/python3.8/site-packages/coiled/core.py", line 180, in current
raise ValueError("Please first connect with coiled.Cloud(...)")
ValueError: Please first connect with coiled.Cloud(...)
Maybe the error message is explicitly describing, what to do, but it's not entirely forwarded by prefect?Asif Imran
10/16/2020, 2:48 AMflow.environment = DaskKubernetesEnvironment(
worker_spec_file="worker_spec.yaml",
)
Some how my flow got into a funk where I can no longer update the spec. Even if I (re)-register the flow and push it to cloud+run, on my cluster a quick k pod describe
reveals the very spec that I am trying to get rid of. Thus far I have tried deleting the flow and starting from scratch. I am pretty sure i dont want to have to rename the flow everytime I bump the versionPrathamesh
10/16/2020, 4:22 AMdef func_a():
do something
Function_B script:
def func b():
do something
Prefect script:
from Function_A import func_a
from Function_B import func_b
@task
def executing_func_a():
func_a()
@task
def executing_func_b():
func_b()
.....flow register.....
Flow registers successfully. When executing flow, received an error:
ModuleNotFoundError("No module named _func_a")_
Please advise.