ciaran
03/25/2021, 11:28 AMciaran
03/25/2021, 12:04 PM0.14.3
are not aligned to the commands I'm able to run...
$ prefect version
0.14.3
$ prefect register -h
Usage: prefect register [OPTIONS] COMMAND [ARGS]...
Register flows
Usage:
$ prefect register [OBJECT]
Arguments:
flow Register flows with a backend API
Examples:
$ prefect register flow --file my_flow.py --name My-Flow
Options:
-h, --help Show this message and exit.
I'd expect to see
Register one or more flows into a project.
Options:
--project TEXT The name of the Prefect project to register this flow in.
Required.
-p, --path TEXT A path to a file or a directory containing the flow(s) to
register. May be passed multiple times to specify
multiple paths.
-m, --module TEXT A python module name containing the flow(s) to register.
May be passed multiple times to specify multiple modules.
...
As defined in the docs at https://docs.prefect.io/api/latest/cli/register.htmlciaran
03/25/2021, 1:48 PMRyan Abernathey
03/25/2021, 1:52 PMRyan Abernathey
03/25/2021, 1:56 PMClient.register_worker_plugin
(https://docs.dask.org/en/latest/futures.html#distributed.Client.register_worker_plugin) with my Dask clusters to install custom pip packages (e.g. install from dev branch for debugging.) If I am using prefect with an existing dask scheduler, how can I call client.register_worker_plugin
? I tried
executor = DaskExecutor(
address=cluster.scheduler_address,
client_kwargs={"security": cluster.security}
)
executor.client.register_worker_plugin
but it looks like the client
attribute on executor
is None.Amit Gal
03/25/2021, 2:11 PMfrom prefect import Flow, task
@task()
def some_task(x):
return x+1
some_list = [1, 2, 3, 4]
with Flow("list_comp") as list_comp_flow:
mapped_result = some_task.map(some_list)
list_comp = [result+1 for result in mapped_result]
results in the following error:
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Which makes sense of course, since while building the flow, mapped_result
is still a Task
, not a list
.Steve Aby
03/25/2021, 4:13 PMMarwan Sarieddine
03/25/2021, 4:17 PMJosh Greenhalgh
03/25/2021, 4:54 PMRunNamespacedJob
task);
How can I force it to defer trying to access the cluster until its actually running?ciaran
03/25/2021, 5:38 PMFailed to load and execute Flow's environment: StorageError("An error occurred while unpickling the flow:\n TypeError('code() takes at most 15 arguments (16 given)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.8.6', currently running with '3.7.10')")
?
For background I'm using an ECS Agent.ciaran
03/25/2021, 6:19 PMRenzo Becerra
03/25/2021, 6:21 PMBerty
03/25/2021, 7:30 PMNET::ERR_CERT_COMMON_NAME_INVALID
for prefect.ioJack Sundberg
03/25/2021, 7:48 PMTrevor Kramer
03/25/2021, 10:15 PM[2021-03-25 17:12:55-0500] DEBUG - prefect.TaskRunner | Task 'AWSClientWait': Handling state change from Running to Success
[2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'AWSClientWait': Finished task run for task with final state: 'Success'
[2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'pruning': Starting task run...
[2021-03-25 17:12:55-0500] DEBUG - prefect.TaskRunner | Task 'pruning': Handling state change from Pending to Failed
[2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'pruning': Finished task run for task with final state: 'Failed'
It seems to be failing before it even gets called.matta
03/25/2021, 11:30 PMPo Stevanus
03/26/2021, 8:09 AMFailed to load and execute Flow's environment: Forbidden('GET <https://storage.googleapis.com/storage/v1/b/bonza-dev-files?projection=noAcl&prettyPrint=false>: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')
Background:
• Agent: Kubernetes Autopilot
• Storage: Google Cloud Storage
What i’ve done:
1. I have prepare a service account with Storage.Admin
role, i called it OLYMPUS_DEV_SA
2. When declaring the storage i have mention this
storage = GCS(bucket="bonza-dev-files", project="bonza-dev", secrets=["OLYMPUS_DEV_SA"])
If it helps, i’ve attach the code to this thread
Is there a way to debug this? 🙇Mahesh
03/26/2021, 10:23 AMciaran
03/26/2021, 11:48 AMdask-cloudprovider[aws]==2021.3.0
when I have prefect[aws]==0.14.13
installed. It seems they have different requirements for botocore:
There are incompatible versions in the resolved dependencies:
botocore<1.19.53,>=1.19.52 (from aiobotocore==1.2.2->dask-cloudprovider[aws]==2021.3.0->-r /var/folders/kf/93zlmdv15vz6sjhr2xd0j7y40000gn/T/pipenv_bvj4rpkrequirements/pipenv-1_o8bqwg-constraints.txt (line 6))
botocore<1.21.0,>=1.20.38 (from boto3==1.17.38->prefect[aws]==0.14.13->-r /var/folders/kf/93zlmdv15vz6sjhr2xd0j7y40000gn/T/pipenv_bvj4rpkrequirements/pipenv-1_o8bqwg-constraints.txt (line 5))
Is there a specific version of dask-cloudprovider
that prefect
works with?Greg Roche
03/26/2021, 3:18 PMflow.register()
on such a flow fails with error 'list' object has no attribute 'update'.
A reproducible example is in the thread. Has anybody come across this before or knows how to resolve it?Ben Fogelson
03/26/2021, 3:26 PMall_finished
but that does different things depending on whether it would have been triggered by all_successful
or some_failed
.Ryan Abernathey
03/26/2021, 3:37 PMliren zhang
03/26/2021, 3:47 PMStartFlowRun(flow_name="A", project_name="examples", wait=True)
can execute child flow that has not been registered. Basically, I would like to test everything locally before I register anything.kevin
03/26/2021, 4:39 PMChristian Eik
03/26/2021, 6:06 PMwith Flow('fiege_po_uploads', schedule=cron_schedule) as fiege_po_uploads:
po_nos_query = build_po_nos_query.map(brand=brands)
po_nos = snowflake_queryfetch.map(
query=po_nos_query, upstream_tasks=[po_nos_query])
po_query = build_po_data_query.map(
po_no=po_nos, brand=brands, upstream_tasks=[po_nos])
brands is a list of 2 strings. in the third task I'm trying to use 2 mapped arguments and i can kinda understand that this doesn't really work. however what is the proper way to do something like this? i'm a complete beginner to both prefect and anything functional, so i probably have a conceptual misunderstanding here. essentially what i'm trying to do here is having an outside loop over brands
, an inside loop over po_nos
, and be able to use the value of brands
in every inner iteration.Vincent
03/26/2021, 6:09 PMParameter
can have a default) I know that the client can submit a context but it would be a nice feature to also specify a persistent default context for every run.Mary Clair Thompson
03/26/2021, 6:24 PMLouis-David Coulombe
03/26/2021, 7:56 PMLuke Orland
03/26/2021, 8:29 PMColin Dablain
03/27/2021, 12:36 AMTypeError: Cannot map over unsubscriptable object of type <class 'generator'>:
Is there something I'm missing conceptually/am I framing my problem incorrectly? I can provide code samples that I've tried if that would help. Thanks!Colin Dablain
03/27/2021, 12:36 AMTypeError: Cannot map over unsubscriptable object of type <class 'generator'>:
Is there something I'm missing conceptually/am I framing my problem incorrectly? I can provide code samples that I've tried if that would help. Thanks!matta
03/27/2021, 1:29 AMMichael Adkins
03/27/2021, 1:54 AMyield
/ generators in tasks but there's not even a design document yet.Colin Dablain
03/27/2021, 2:55 AM