dh
04/08/2021, 11:22 PMregisterd_flow.run(**runtime_args_passed_by_agent)
Or is this disallowed by design? (registered flow shall be self-sufficient for reproducibility)
Context: we have some flow that depends on some value that changes quite often (e.g. dependency package version number). We don’t want to register a new flow each time we update the version number; rather have one flow and have the agent run the flow expecting the package version information will be provided to it at run time. We thought about using env var, but not sure if it’s the best way…Rob Fowler
04/09/2021, 4:26 AMBrian Keating
04/09/2021, 5:45 AMResourceManager
to create and terminate EC2 instances. I test it out with a bare bones workflow:
@task
def do_something_on_instance(instance_id):
prefect.context.get('logger').info(f'Do something on instance {instance_id}')
with Flow('hello-ec2') as flow:
with EC2Instance('t2.micro') as instance_id:
do_something_on_instance(instance_id) # instance_id is a string
This works correctly when using github storage, but when I switch to S3, the flow fails with TypeError: cannot pickle 'SSLContext' object
. Anyone know what's going on here? Note that the value returned by EC2Instance.setup
is a str
.Joe McDonald
04/09/2021, 5:52 AMAn error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
xyzz
04/09/2021, 8:54 AMNoah Holm
04/09/2021, 9:41 AMMarko Jamedzija
04/09/2021, 10:54 AMscheduled
state (becoming late runs). Is this a bug, or there’s a way of configuring k8s agent to work with more than one tenant? If so, how? Thanks 🙂 (I deployed the services using the prefecthq helm chart)Brent Bateman
04/09/2021, 1:45 PMJoe McDonald
04/09/2021, 1:50 PMTrevor Kramer
04/09/2021, 1:55 PMwith OracleRDS() as client
?Oussama Louati
04/09/2021, 2:16 PMdocker run my_image
, i get this error each time:
Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/root/'")
Should i run the agent inside the container ? Thank youPaul Prescod
04/09/2021, 3:00 PMHygor Knust
04/09/2021, 3:32 PMJonathan Buys
04/09/2021, 3:55 PMxyzy
04/09/2021, 4:04 PMProgrammingError('SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 8032 and this is thread id 2308.')
Is there a way to work around this other than using a LocalExecutor? e.g. a parameter for the ResourceManager to tell it to keep everything that uses it in the same thread?Josh
04/09/2021, 4:27 PMterminal_state_handlers
on Flow objects that will cause a flow to fail if they aren’t present even though the 0.14.15 version defaults to None.
• log_output
on the ExecuteNotebook
class that defaults to False on the 0.14.15 version, but the execution fails if a flow on a previous version is being run with the attribute not specifiedKevin Kho
Sean Harkins
04/09/2021, 5:33 PMPipInstall
https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.PipInstall to install dependencies on our workers at worker creation time). We are using a temporary cluster model as described here https://docs.prefect.io/orchestration/flow_config/executors.html#using-a-temporary-cluster. Is it possible to somehow obtain a reference the `DaskExecutor`’s distributed client
https://github.com/PrefectHQ/prefect/blob/master/src/prefect/executors/dask.py#L209 from within our Flow code (or Task code) so that we can call register_worker_plugin
? I’ve browsed through the codebase but I’m not sure where in the Flow execution process that the `DaskExecutor`’s client
is yielded and used?Trevor Kramer
04/09/2021, 8:50 PMKIRYL BUCHA
04/10/2021, 11:51 AMTrevor Kramer
04/10/2021, 1:42 PMSlackbot
04/10/2021, 5:44 PMSean Harkins
04/11/2021, 11:17 PMWorkerPlugins
). To test this initially I applied our decorator to a test task
def register_plugin(func):
@wraps(func)
def wrapper(*args, **kwargs):
client = distributed.get_client()
plugin = PipInstall(packages=["xarray"])
client.register_worker_plugin(plugin)
result = func(*args, **kwargs)
return result
return wrapper
@task
@register_plugin
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
This works as expected and the Dask worker logs show the plugin use.
[2021-04-11 23:11:49+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Starting task run...
distributed.worker - INFO - Starting Worker plugin pip
distributed.diagnostics.plugin - INFO - Pip installing the following packages: ['xarray']
[2021-04-11 23:11:50+0000] INFO - prefect.say_hello | Hello, Cloud
[2021-04-11 23:11:50+0000] INFO - prefect.CloudTaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
However, we don’t have access to the underlying tasks so instead we need to access them from an existing flow, wrap them and replace them within the flow prior to flow registration with
for flow_task in flow.tasks:
wrapped_task = register_plugin(flow_task)
flow.replace(flow_task, wrapped_task)
But using this approach seems to alter the task’s execution as the worker logs do not report the plugin use, or the Prefect logger
statements.
distributed.core - INFO - Starting established connection
[2021-04-11 22:43:26+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Starting task run...
[2021-04-11 22:43:27+0000] INFO - prefect.CloudTaskRunner | Task 'Constant[function]': Finished task run for task with final state: 'Success'
Am I missing something obvious in my task wrapping approach and replacement in the flow? Is there a better approach for accomplishing this?Svyat
04/12/2021, 12:02 AMRob Fowler
04/12/2021, 3:49 AMRob Fowler
04/12/2021, 9:49 AMdocker run -it prefecthq/prefect:latest-python3.6 /bin/bash
root@c281a603b87e:/# prefect version
0.14.12
xyzz
04/12/2021, 11:38 AMJérémy Trudel
04/12/2021, 12:52 PMprefect register --project "my project" --path myflow.py
All it returns is:
Project 'my project' does not exist
I know it does not exist since I'm trying to register it. Is there something I'm missing?Philip MacMenamin
04/12/2021, 1:23 PMsignals.FAIL
within the task, but for some tasks I just want to exit a flow if they don't work, and not have to mark every other task dependent on that task succeeding. Anything I can look at?ciaran
04/12/2021, 2:25 PM