Varun Joshi
04/08/2021, 5:46 PMJoseph Ellis
04/08/2021, 5:47 PMBrent Bateman
04/08/2021, 7:46 PMBerty
04/08/2021, 7:55 PMdh
04/08/2021, 11:22 PMregisterd_flow.run(**runtime_args_passed_by_agent)
Or is this disallowed by design? (registered flow shall be self-sufficient for reproducibility)
Context: we have some flow that depends on some value that changes quite often (e.g. dependency package version number). We don’t want to register a new flow each time we update the version number; rather have one flow and have the agent run the flow expecting the package version information will be provided to it at run time. We thought about using env var, but not sure if it’s the best way…Rob Fowler
04/09/2021, 4:26 AMBrian Keating
04/09/2021, 5:45 AMResourceManager
to create and terminate EC2 instances. I test it out with a bare bones workflow:
@task
def do_something_on_instance(instance_id):
prefect.context.get('logger').info(f'Do something on instance {instance_id}')
with Flow('hello-ec2') as flow:
with EC2Instance('t2.micro') as instance_id:
do_something_on_instance(instance_id) # instance_id is a string
This works correctly when using github storage, but when I switch to S3, the flow fails with TypeError: cannot pickle 'SSLContext' object
. Anyone know what's going on here? Note that the value returned by EC2Instance.setup
is a str
.Joe McDonald
04/09/2021, 5:52 AMAn error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
xyzz
04/09/2021, 8:54 AMNoah Holm
04/09/2021, 9:41 AMMarko Jamedzija
04/09/2021, 10:54 AMscheduled
state (becoming late runs). Is this a bug, or there’s a way of configuring k8s agent to work with more than one tenant? If so, how? Thanks 🙂 (I deployed the services using the prefecthq helm chart)Brent Bateman
04/09/2021, 1:45 PMJoe McDonald
04/09/2021, 1:50 PMTrevor Kramer
04/09/2021, 1:55 PMwith OracleRDS() as client
?Oussama Louati
04/09/2021, 2:16 PMdocker run my_image
, i get this error each time:
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/root/'")
Should i run the agent inside the container ? Thank youPaul Prescod
04/09/2021, 3:00 PMHygor Knust
04/09/2021, 3:32 PMJonathan Buys
04/09/2021, 3:55 PMxyzy
04/09/2021, 4:04 PMProgrammingError('SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 8032 and this is thread id 2308.')
Is there a way to work around this other than using a LocalExecutor? e.g. a parameter for the ResourceManager to tell it to keep everything that uses it in the same thread?Josh
04/09/2021, 4:27 PMterminal_state_handlers
on Flow objects that will cause a flow to fail if they aren’t present even though the 0.14.15 version defaults to None.
• log_output
on the ExecuteNotebook
class that defaults to False on the 0.14.15 version, but the execution fails if a flow on a previous version is being run with the attribute not specifiedKevin Kho
04/09/2021, 5:28 PMSean Harkins
04/09/2021, 5:33 PMPipInstall
https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.PipInstall to install dependencies on our workers at worker creation time). We are using a temporary cluster model as described here https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster. Is it possible to somehow obtain a reference the `DaskExecutor`’s distributed client
https://github.com/PrefectHQ/prefect/blob/master/src/prefect/executors/dask.py#L209 from within our Flow code (or Task code) so that we can call register_worker_plugin
? I’ve browsed through the codebase but I’m not sure where in the Flow execution process that the `DaskExecutor`’s client
is yielded and used?Trevor Kramer
04/09/2021, 8:50 PMKIRYL BUCHA
04/10/2021, 11:51 AMTrevor Kramer
04/10/2021, 1:42 PMSlackbot
04/10/2021, 5:44 PMSean Harkins
04/11/2021, 11:17 PMWorkerPlugins
). To test this initially I applied our decorator to a test task
def register_plugin(func):
@wraps(func)
def wrapper(*args, **kwargs):
client = distributed.get_client()
plugin = PipInstall(packages=["xarray"])
client.register_worker_plugin(plugin)
result = func(*args, **kwargs)
return result
return wrapper
@task
@register_plugin
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
This works as expected and the Dask worker logs show the plugin use.
[2021-04-11 23:11:49+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Starting task run...
distributed.worker - INFO - Starting Worker plugin pip
distributed.diagnostics.plugin - INFO - Pip installing the following packages: ['xarray']
[2021-04-11 23:11:50+0000] INFO - prefect.say_hello | Hello, Cloud
[2021-04-11 23:11:50+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
for flow_task in flow.tasks:
wrapped_task = register_plugin(flow_task)
flow.replace(flow_task, wrapped_task)
But using this approach seems to alter the task’s execution as the worker logs do not report the plugin use, or the Prefect logger
statements.
distributed.core - INFO - Starting established connection
[2021-04-11 22:43:26+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Starting task run...
[2021-04-11 22:43:27+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Finished task run for task with final state: 'Success'
Am I missing something obvious in my task wrapping approach and replacement in the flow? Is there a better approach for accomplishing this?Svyat
04/12/2021, 12:02 AMRob Fowler
04/12/2021, 3:49 AMRob Fowler
04/12/2021, 9:49 AMdocker run -it prefecthq/prefect:latest-python3.6 /bin/bash
root@c281a603b87e:/# prefect version
0.14.12
Rob Fowler
04/12/2021, 9:49 AMdocker run -it prefecthq/prefect:latest-python3.6 /bin/bash
root@c281a603b87e:/# prefect version
0.14.12
Greg Roche
04/12/2021, 10:09 AM(.venv) C:\Users\groche\source> docker run -it prefecthq/prefect:latest-python3.6 /bin/bash
Unable to find image 'prefecthq/prefect:latest-python3.6' locally
latest-python3.6: Pulling from prefecthq/prefect
75646c2fb410: Already exists
62342603b9a2: Already exists
0bdd7747fb18: Pull complete
c8ecc0b9e8c5: Pull complete
e51cfb6f5145: Pull complete
e813c6a2bec6: Pull complete
56280ee759f3: Pull complete
Digest: sha256:b663f215c0dccedf2ee432ebaafa8d636e4aa424fa6e35e42c516385fa6718d6
Status: Downloaded newer image for prefecthq/prefect:latest-python3.6
root@2838bac2f4a5:/# prefect version
0.14.15
Looks like your docker image is outdated.Rob Fowler
04/12/2021, 11:14 AM