Sandeep Aggarwal
06/30/2020, 1:36 PMDarragh
06/30/2020, 1:48 PMDarragh
06/30/2020, 3:06 PMPhilip Blankenau
06/30/2020, 3:44 PMTraceback (most recent call last):
File "C:\Users\pblankenau\Anaconda3\envs\gis\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "C:\Users\pblankenau\Anaconda3\envs\gis\lib\site-packages\prefect\engine\task_runner.py", line 661, in check_target
new_res = result.read(target.format(**prefect.context))
File "C:\Users\pblankenau\Anaconda3\envs\gis\lib\site-packages\prefect\engine\results\local_result.py", line 79, in read
new.value = cloudpickle.loads(f.read())
EOFError: Ran out of input
class Copy(Task):
def run(self):
src = Path(r"D:\test\dumb.txt")
dst = Path(r"D:\test\copy_in_here\dumb.txt")
shutil.copyfile(src, dst)
return dst
with Flow("cp") as flow:
dst = Path(r"D:\test\copy_in_here\dumb.txt")
result = Copy(target=str(dst), checkpoint=True, result=LocalResult())()
flow.run()
Rafal
06/30/2020, 6:00 PMVikram Iyer
06/30/2020, 7:11 PMprefect agent start docker --network prefect-server
I am still not able to execute the flows from the UI. Is there anything I am doing wrong?dh
06/30/2020, 7:50 PMAvi A
06/30/2020, 8:22 PMGCSResult
in my flow and getting errors from GCS (caused by the GCSResults
object trying to store task results:
Unexpected error: SSLError(MaxRetryError("HTTPSConnectionPool(host='<http://storage.googleapis.com|storage.googleapis.com>', port=443): Max retries exceeded with url: /upload/storage/v1/b/prefect-bucket/o?uploadType=multipart (Caused by SSLError(OSError(24, 'Too many open files')))"))
The reason is (probably) that I have many mapped tasks, and they are running in parallel. It’s a 32-core machine, but I can’t really figure out how Prefect/Dask decides how many run in parallel. Sometimes it’s more than 32.
The bigger problem here is that I think it caused the flow to get stuck, so I didn’t even have the indication for the failure and couldn’t go and restart it.
Anyhow, any suggestions on how to overcome this? Or at the very least cause my flow to fail on such things so that I can restart? Does it have anything to do with the fact that these tasks have a retry option?Zach Angell
06/30/2020, 8:50 PMPhilip MacMenamin
06/30/2020, 9:25 PMa = task_a()
b = task_b(a)
task_c(upstream_tasks=[task_b])
This produces a strange looking DAG. I would expect that to produce a DAG like
task_a ---> task_b ----> task_cBenjamin
06/30/2020, 9:50 PMwith Flow("standardization") as flow:
df = read_feats(bucket, file_pattern) # a task that returns a dask dataframe using dd.read_parquet(...)
scaled_df = scale(df) # prefect task that does the standardization using dask-ml StandardScaler
write_csv_to_s3(scaled_df, bucket, output_file) # this will write the dask dataframe as a csv to an s3 bucket
This is a very simple POC just to get things running and feel prefect in action. I'm using version 0.12.
Everything runs smoothly if I run the flow locally using flow.run with a remote DaskExecutor in a FargateCluster (using dask-cloudprovider):
executor = DaskExecutor(...) # cluster parameters setting fargatecluster from cloudprovider and cluster_kwargs
flow.run(executor=executor)
I start having problem if I try to run it using a local prefect agent connected to a local prefect server after registering it:
executor = DaskExecutor(...) # same parameters as before
flow.environment = LocalEnvironment(executor=executor)
flow.register()
The agent will deploy the flow, it will create FargateCluster normally and we can see the tasks registered at Dask UI Tasks stream but no processing actually happens. It desearializes the first task and do nothing, then does the same with the second and third tasks.
Any idea what am I doing wrong here?Luis Muniz
06/30/2020, 10:13 PMAlfie
07/01/2020, 2:46 AMVikram Iyer
07/01/2020, 8:55 AMMatthias
07/01/2020, 1:07 PMflow.run()
is running synchronous here, right?Jacques
07/01/2020, 1:23 PMScott Zelenka
07/01/2020, 1:42 PM[2020-07-01 13:40:32] DEBUG - prefect.CloudFlowRunner | Failed to retrieve flow state with error: AttributeError("'SSLSocket' object has no attribute 'connection'")
The Flow runs fine on my local machine, and the healthcheck passes, but when it's deployed to K8, it seems the CloudFlowRunner cannot write logs to Prefect Cloud and aborts the execution of the Flow.Michael C. Grant
07/01/2020, 2:23 PMZach Angell
07/01/2020, 2:30 PMis_serializable(flow, True
gives me some debugging output but the error isn't particularly revealing
subprocess.CalledProcessError: Command 'python /var/folders/vz/kjj4n5wj2bb7q26n0kz4v4jw0000gp/T/tmpr1ux82t1' returned non-zero exit status 1.
Is there a way to get more detail from the debugger?Scott Zelenka
07/01/2020, 3:06 PMS3Result
instead of S3ResultHandler
, do we need to have a PrefectSecret named AWS_CREDENTIALS
?
Previously with S3ResultHandler
there was a kwarg for `aws_credentials_secret`(which allowed us to specify different credentials for different Flows), but that doesn't seem to be present on the new S3Result
. Trying to figure out how to pass those credentials to S3Result
Matt Allen
07/01/2020, 3:42 PMVitor Avancini
07/01/2020, 4:12 PMitay livni
07/01/2020, 4:45 PMprefect.utilities.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': '[{\'extensions\': {\'path\': \'$.variableValues\', \'code\': \'validation-failed\'}, \'message\': "no such type exists in the schema: \'flow_group_insert_input\'"}]', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
ThanksVikram Iyer
07/01/2020, 7:02 PMmyapp
and another container which is actually the prefect agent
that will be running the flows. I figured out the agent
needs to have the code that is there in myapp
. So, I set up the flow like below,
flow.storage = Docker(dockerfile='path_of_Dockerfile_inside_myapp') #this will ensure that Dockerfile I used to build myapp will be used to build agent
flow.storage.build() #this step fails with the eroor: FileNotFound. I looked at the prefect code in the github repo and this should work, but fails
flow.register(labels=['some_label'])
Is there a gap in my understanding of how the Docker storage works?Hannah Amundson
07/01/2020, 7:32 PMNicolas van de Walle
07/02/2020, 10:07 AMLuis Muniz
07/02/2020, 2:13 PMregister()
to push a flow to a ECR (amazon) private registry, but we can't figure out how prefect picks up the credentials to do it. We know that with the command line docker build
and docker push
, you need to execute a special docker login
with aws ecr get-login...
command, but don't know how to pass this on to prefect for register(). We feel like we miss a step.Dan Ball
07/02/2020, 5:54 PMBenjamin
07/02/2020, 7:30 PMflow.run(executor=executor)
but not with flow.register()
.
We're using using the FargateCluster to create a new cluster for the flow and it's setup properyl with flow.register but no processing is done. I had asked this before a couple of days ago here but now I have a reproducible exemple.
I'll provide the details in this thread.
Thanks a lotBob Colner
07/02/2020, 9:58 PM