Ryan Abernathey
03/25/2021, 1:56 PMClient.register_worker_plugin
(https://docs.dask.org/en/latest/futures.html#distributed.Client.register_worker_plugin) with my Dask clusters to install custom pip packages (e.g. install from dev branch for debugging.) If I am using prefect with an existing dask scheduler, how can I call client.register_worker_plugin
? I tried
executor = DaskExecutor(
address=cluster.scheduler_address,
client_kwargs={"security": cluster.security}
)
executor.client.register_worker_plugin
but it looks like the client
attribute on executor
is None.Amit Gal
03/25/2021, 2:11 PMfrom prefect import Flow, task
@task()
def some_task(x):
return x+1
some_list = [1, 2, 3, 4]
with Flow("list_comp") as list_comp_flow:
mapped_result = some_task.map(some_list)
list_comp = [result+1 for result in mapped_result]
results in the following error:
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Which makes sense of course, since while building the flow, mapped_result
is still a Task
, not a list
.Steve Aby
03/25/2021, 4:13 PMMarwan Sarieddine
03/25/2021, 4:17 PMJosh Greenhalgh
03/25/2021, 4:54 PMRunNamespacedJob
task);
How can I force it to defer trying to access the cluster until its actually running?ciaran
03/25/2021, 5:38 PMFailed to load and execute Flow's environment: StorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.8.6', currently running with '3.7.10')")
?
For background I'm using an ECS Agent.ciaran
03/25/2021, 6:19 PMRenzo Becerra
03/25/2021, 6:21 PMBerty
03/25/2021, 7:30 PMNET::ERR_CERT_COMMON_NAME_INVALID
for prefect.ioJack Sundberg
03/25/2021, 7:48 PMTrevor Kramer
03/25/2021, 10:15 PM[2021-03-25 17:12:55-0500] DEBUG - prefect.TaskRunner | Task 'AWSClientWait': Handling state change from Running to Success
[2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'AWSClientWait': Finished task run for task with final state: 'Success'
[2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'pruning': Starting task run...
[2021-03-25 17:12:55-0500] DEBUG - prefect.TaskRunner | Task 'pruning': Handling state change from Pending to Failed
[2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'pruning': Finished task run for task with final state: 'Failed'
It seems to be failing before it even gets called.matta
03/25/2021, 11:30 PMPo Stevanus
03/26/2021, 8:09 AMFailed to load and execute Flow's environment: Forbidden('GET <https://storage.googleapis.com/storage/v1/b/bonza-dev-files?projection=noAcl&prettyPrint=false>: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')
Background:
• Agent: Kubernetes Autopilot
• Storage: Google Cloud Storage
What i’ve done:
1. I have prepare a service account with Storage.Admin
role, i called it OLYMPUS_DEV_SA
2. When declaring the storage i have mention this
storage = GCS(bucket="bonza-dev-files", project="bonza-dev", secrets=["OLYMPUS_DEV_SA"])
If it helps, i’ve attach the code to this thread
Is there a way to debug this? 🙇Mahesh
03/26/2021, 10:23 AMciaran
03/26/2021, 11:48 AMdask-cloudprovider[aws]==2021.3.0
when I have prefect[aws]==0.14.13
installed. It seems they have different requirements for botocore:
There are incompatible versions in the resolved dependencies:
botocore<1.19.53,>=1.19.52 (from aiobotocore==1.2.2->dask-cloudprovider[aws]==2021.3.0->-r /var/folders/kf/93zlmdv15vz6sjhr2xd0j7y40000gn/T/pipenv_bvj4rpkrequirements/pipenv-1_o8bqwg-constraints.txt (line 6))
botocore<1.21.0,>=1.20.38 (from boto3==1.17.38->prefect[aws]==0.14.13->-r /var/folders/kf/93zlmdv15vz6sjhr2xd0j7y40000gn/T/pipenv_bvj4rpkrequirements/pipenv-1_o8bqwg-constraints.txt (line 5))
Is there a specific version of dask-cloudprovider
that prefect
works with?Greg Roche
03/26/2021, 3:18 PMflow.register()
on such a flow fails with error 'list' object has no attribute 'update'.
A reproducible example is in the thread. Has anybody come across this before or knows how to resolve it?Ben Fogelson
03/26/2021, 3:26 PMall_finished
but that does different things depending on whether it would have been triggered by all_successful
or some_failed
.Ryan Abernathey
03/26/2021, 3:37 PMliren zhang
03/26/2021, 3:47 PMStartFlowRun(flow_name="A", project_name="examples", wait=True)
can execute child flow that has not been registered. Basically, I would like to test everything locally before I register anything.kevin
03/26/2021, 4:39 PMChristian Eik
03/26/2021, 6:06 PMwith Flow('fiege_po_uploads', schedule=cron_schedule) as fiege_po_uploads:
po_nos_query = build_po_nos_query.map(brand=brands)
po_nos = snowflake_queryfetch.map(
query=po_nos_query, upstream_tasks=[po_nos_query])
po_query = build_po_data_query.map(
po_no=po_nos, brand=brands, upstream_tasks=[po_nos])
brands is a list of 2 strings. in the third task I'm trying to use 2 mapped arguments and i can kinda understand that this doesn't really work. however what is the proper way to do something like this? i'm a complete beginner to both prefect and anything functional, so i probably have a conceptual misunderstanding here. essentially what i'm trying to do here is having an outside loop over brands
, an inside loop over po_nos
, and be able to use the value of brands
in every inner iteration.Vincent
03/26/2021, 6:09 PMParameter
can have a default) I know that the client can submit a context but it would be a nice feature to also specify a persistent default context for every run.Mary Clair Thompson
03/26/2021, 6:24 PMLouis-David Coulombe
03/26/2021, 7:56 PMLuke Orland
03/26/2021, 8:29 PMColin Dablain
03/27/2021, 12:36 AMTypeError: Cannot map over unsubscriptable object of type <class 'generator'>:
Is there something I'm missing conceptually/am I framing my problem incorrectly? I can provide code samples that I've tried if that would help. Thanks!matta
03/27/2021, 2:14 AMSuccessful
. I've tried setting Reference tasks after the Flow def, I'd tried making a task that raises signals.SUCCESS()
and is downstream from the two main tasks (and I've tried setting the trigger to be all_finished
), nothing seems to work. The main element of it is is two mapped tasks.Ananthapadmanabhan P
03/27/2021, 4:33 PMPREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<my_key_here>", "SECRET_ACCESS_KEY": "<my_secret_key_here>"}' python create_flow.py
And this is how i internally pass it down into the KubernetesRun method
job_env = {
"PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS":
os.getenv("PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS"),
"PREFECT__BACKEND":
"server"
}
flow.run_config = KubernetesRun(env=job_env,
image="ananthutest/prefect-test:latest")
But when i do kubectl describe of the created pod/job in k8s, it shows PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS
under Environment
in plain text. Anyway I can avoid this?Trevor Kramer
03/27/2021, 6:59 PM@task()
def get_s3_location(workspace_bucket):
return f's3://{workspace_bucket}/{prefect.context.get("flow_run_name")}/'
Something like the simple addition prefect turns into tasks automatically within the flow?xyzy
03/27/2021, 8:53 PM