Krzysztof Nawara
10/06/2020, 5:39 PMDolor Oculus
10/06/2020, 6:05 PMPREFECT__SERVER__ENDPOINT
(in bash, before invoking Python). Are their other ways of configuring this? (ie via constructor arguments at runtime)? ty!Marwan Sarieddine
10/06/2020, 6:36 PMMike Fransesco
10/06/2020, 7:05 PMChirag
10/07/2020, 1:14 PMSven Teresniak
10/07/2020, 2:39 PM[2020-10-07 14:26:37,427] ERROR - agent1 | 400 Client Error: Bad Request for url: <http://localhost:4200/>
But it is still possible to register flows as usual. I seem them in the UI, too.
I switched back to v0.13.9 and the error is gone. I only changed the Prefect version -- no configuration and no flow code.
I'd like to provide you with more information about this bug but the agent is ignoring $PREFECT__LOGGING__LEVEL=DEBUG
it seems.
Is there any change in the enviroment for the agent? Why can I register flows but not start them?Matias Godoy
10/07/2020, 3:47 PMdev
and the other with label prod
), which also means that I'm using Docker Storage for the flows.
Inside my flow I set the labels like this:
flow.environment = LocalEnvironment(labels=['dev'])
The thing is that I'd like the flow label to be "dynamic" in the sense that if I register that flow from a development environment, the label is automatically set to dev
, but if I want to register it for production, I'd like the flow label to be prod
.
For now I register the flows from my own laptop, which automatically generates the Docker image and uploads it to my ECR for the Agents to use.
Maybe I'm wrong with this, but I guess that if I set an environment variable in my local computer, and make the flow code something like flow.environment = LocalEnvironment(labels=[my_env_var])
it will not work, because that variable will not exist in the container that runs the flow. Is this correct?
Another alternative would be to be able to set flow labels when registering the flow using the CLI. Something like prefect register flow --file my_flow.py --name My-Flow --label dev
, but according to the CLI help, --label
is not a valid parameter when registering flows.
Do you have any recommendations for setting flow labels dynamically depending on the environment they where registered?
Thanks!Thomas La Piana
10/07/2020, 4:22 PMjosh
10/07/2020, 4:27 PM0.13.10
has been released and here are a few notable changes:
📝 Added utilities for dynamically naming flow and task runs
🕵️♀️ Enabled agent registration for Server
🌐 Allowed for specifying an import path for Flows in Local Storage
A big thank you to our contributors who helped out with this release! Full changelog:Konstantinos
10/07/2020, 6:18 PMNakul Gowdra
10/08/2020, 1:58 AMRob Fowler
10/08/2020, 4:41 AMNejc Vesel
10/08/2020, 9:11 AMfrom prefect.utilities.configuration import set_temporary_config
def test_prefect(input_folder, output_folder):
with set_temporary_config({"home_dir": output_folder}):
exec_config = ExecutionConfig(dataset_location=os.path.join(get_path('parquet'), 'pq-dataset'),
save_location=os.path.join(output_folder, 'mean_ndvi_results'))
status = flow.run(executor=LocalExecutor(), flow_config=asdict(exec_config))
assert status.is_successful()
(took a look at some tests in the prefect source), but it doesnt work at still writes LocalResult results to ~/.prefectNuno Silva
10/08/2020, 10:53 AMtable_copy (Mapped Child 5)
instead of e.g. table_copy_visits
.
I'm running something like task_run_name=lambda **kwargs: f"table_copy_{kwargs['name']}"
, where name
is a function parameter.
The flow gets registered successfully with 0.13.10
.
Can it be that prefect UI is not querying yet task_run_name
?
ThanksAnish Chhaparwal
10/08/2020, 1:59 PMIain Dillingham
10/08/2020, 2:18 PMAlfie
10/08/2020, 2:53 PMtom
10/08/2020, 5:42 PMdef fetch_object_ids():
# Long running task with many object_ids returning.
cursor = None
while True:
# Keep fetching and yielding object IDs until we're exhausted.
obj_ids, cursor = external_request_to_fetch_object_ids(cursor)
for obj_id in obj_ids:
yield obj_id
if cursor is None:
break
def process_object(object_id):
do_something_with_object(object_id)
with Flow("Process Objects" as flow):
object_ids = fetch_object_ids() # I don't want to wait here / keep this list in memory
process_object.map(object_ids)
send_finished_email_to_user() # Should depend on all objects processed.
Emma Willemsma
10/08/2020, 8:40 PMfrom prefect import task, Flow
from prefect.environments.storage import Docker
import boto3
@task(log_stdout=True)
def use_boto3():
print('Using boto3 version {}'.format(boto3.__version__))
with Flow('Sample Flow') as flow:
use_boto3()
storage = Docker(python_dependencies=['boto3'])
storage.add_flow(flow)
storage.build()
flow.register(project_name='Dev', build=False)
I would like to be able to run this to register my flow without having to have boto3 installed (since it will only be used during the flow run anyway)Robin
10/08/2020, 9:27 PMFailed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 128, in call_runner_target_handlers
cache_for=self.task.cache_for,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 1321, in set_task_run_state
version=version,
File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State update failed for task run ID 3417baee-44a2-4b39-82f4-c6ac6d073d1e: provided a running state but associated flow run 51cc335d-f029-45c6-80b4-8c88a0173dbc is not in a running state.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Has anyone already experienced this?
How to debug it? 😕Hui Zheng
10/09/2020, 12:01 AMRescheduled by a Lazarus process.
For example, the one in the screenshot is scheduled for 11:10, but actually didn’t start until 11:20. Could anyone help understand why this happen and how to prevent it? Because we are building a new flow which need to run every 10 minutes with a very strict SLA. a 10 minute delay would be fatal to the new flow. Thank youale
10/09/2020, 9:12 AMRobin
10/09/2020, 11:49 AMenv_vars={"PREFECT__LOGGING__LEVEL": "DEBUG"},
in flow.storage = Docker(...)
.
However, we still don't see the debug level log messages in cloud 🤔
Do we need to change anything else?ale
10/09/2020, 2:05 PMJames Phoenix
10/09/2020, 2:46 PMflavienbwk
10/09/2020, 3:02 PMPedro Machado
10/09/2020, 3:28 PMprefect agent start docker
(passing several env variables) with the recipe created by prefect agent install local
but I get a permission denied error and the agent never starts. I am able to start the agent without supervisord
and it works fine. Any ideas? Logs in thread.John Song
10/09/2020, 4:31 PMBrett Naul
10/09/2020, 6:57 PMKrzysztof Nawara
10/09/2020, 7:26 PMall_inputs
cache validator is meant to work.
Cache validator receives 3 arguments, but only 2 of those are relevant here:
- state (State): a `Success` state from the last successful Task run that contains the cache
- inputs (dict): a `dict` of inputs that were available on the last successful run of the cached Task
Now my current understanding (almost certainly incorrect) is that they come from the same run. But then the logic of the validator wouldn't make any sense:
elif getattr(state, "hashed_inputs", None) is not None:
if state.hashed_inputs == {key: tokenize(val) for key, val in inputs.items()}:
return True
else:
return False
elif {key: res.value for key, res in state.cached_inputs.items()} == inputs:
return True
It just compares inputs passed directly to validator to inputs extracted from the state.
So it's pretty clear those 2 arguments can come from different runs, but I don't understand how is that possible. If someone could provide an explenation I'd be very grateful 🙂