Seth Coussens
07/18/2021, 11:03 PMprefect agent docker start --token <token>--name "EMG-DCK-01" --volume company:/mnt/company --label production --log-level DEBUG --show-flow-logs
Any flow using this agent then shows that directory as empty.
When I manually run the container (on the same account, terminal windows, etc as the agent) and mount it with -v using the same syntax it tests out fine and the volume has the expected files:
docker run -it -v company:/mnt/company node:lts bash
cd /mnt/company
ls
**shows all files**
Any ideas why the volume when mounted with the agent configuration would not be working?
This is what the company volume configuration looks like:
PS C:\Users\azure.automation> docker volume inspect company
[
{
"CreatedAt": "2021-07-17T22:43:31Z",
"Driver": "local",
"Labels": {},
"Mountpoint": "/var/lib/docker/volumes/company/_data",
"Name": "company",
"Options": {
"device": "//192.168.1.2/Company",
"o": "user=myusername,domain=mydomain,password=mypassword",
"type": "cifs"
},
"Scope": "local"
}
]
Zach Schumacher
07/19/2021, 2:53 PMKevin Kho
Fabrice Toussaint
07/19/2021, 4:24 PMTraceback (most recent call last):
File "runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "\executors.py", line 537, in prepare_upstream_states_for_
mapping
value = upstream_state.result[i]
KeyError: 0
[2021-07-19 18:23:21+0200] ERROR - prefect.ETL | Unexpected error occured in FlowRunner: KeyError(0)
Hugo Shi
07/19/2021, 6:19 PMLeon Kozlowski
07/19/2021, 7:15 PMMatthew Alhonte
07/19/2021, 8:43 PMHarry Baker
07/19/2021, 8:47 PMErik Amundson
07/19/2021, 10:17 PMDaskExecutor
on GKE with cluster class dask_kubernetes.KubeCluster
and it seems to be dropping 1-2 mapped children per run. It's like the scheduler doesn't realize they exist, or is losing track somehow - there is no error message in the logs. This prevents the flow from proceeding to the downstream tasks, so I end up having to cancel the flow. So far it's shown the same behavior on all four test runs. If it matters, we're running on prefect 0.14.16.Hugo Kitano
07/19/2021, 10:21 PMNishtha Varshney
07/20/2021, 8:06 AMBruno Murino
07/20/2021, 11:02 AMAn Hoang
07/20/2021, 1:26 PM./test_prefect
is created but nothing is outputted. Here's my very simple code:
## test_prefect.py
result = LocalResult(dir="./test_prefect")
@task(target="func_task_target.txt", checkpoint=True, result = result)
def test(val):
return val
with Flow("simple_test",result=result) as test_flow:
result = test(99)
test_flow.run()
I did set PREFECT__FLOWS__CHECKPOINTING=true
before doing python test_prefect.py
Michael Warnock
07/20/2021, 2:38 PMError grabbing logs: unexpected EOF
<- Has anyone seen this in their (docker) agent output. It happened after two frames of stacktrace for me, on a mapped task (149 of 876), and the agent stopped processing anything (or sending anything to stdout anyway, and the rest of the tasks didn't run).
Grepping for "grabbing logs" turns up nothing in the prefect repo, our own code, or any of the dependencies in my virtualenv. It's a cloud-backed flow run; is this coming from cloud, maybe?An Hoang
07/20/2021, 2:58 PMLaura Vaida
07/20/2021, 3:08 PMSean Talia
07/20/2021, 3:38 PMDaniel Burkhardt
07/20/2021, 3:45 PM{map_index}
in the Result location, and it seems like the output of first_map_fn[0]
always get' passed to second_map_fn[0]
even if first_map_fn[1]
finishes first. Is this guaranteed? Is there a better way to identify which Result is used as input to a specific task run?
Please find example flow in thread.Jared Noynaert
07/20/2021, 3:57 PMClemens
07/20/2021, 4:26 PMrun_always
trigger to run, well, always.
Example code in threadMehdi Nazari
07/20/2021, 4:41 PMprefect agent docker start --key "SERVICE_ACCOUNT_KEY" --name "AGENT_NAME"
I was also able to write a simple Flow and register as a docker docker storage.
Cloud UI also recognizes the Flow as a registered flow; but upon executing it, I’m getting below error on the Agent:
docker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.40/containers/create?name=nickel-okapi>: Bad Request ("invalid IP address in add-host: "host-gateway"")
What am I missing? any troubleshooting I can do?Vincent
07/20/2021, 4:51 PMwait=True
for StartFlowRun
tasks, the result objects are not stored at the target store. I want to kick off a flow of flows, such that the StartFlowRun
tasks saves a result object to avoid resubmission of the flow. wait=False
restores the expected behavior. Are there any suggestions to have dependent flows, with checkpointing.Sean Harkins
07/20/2021, 5:14 PMCloud Hook
for the Flow success
event. This needs to be done programmatically via the api at registration time. The documentation states Psst! We have recently added Automations which offer more functionality than Cloud Hooks and will eventually replace them.
There does not seem to currently be programmatic api access to Automations.
1. Is there a way to register an Automation via an API and if so is it documented somewhere?
2. When is the planned deprecation of Cloud Hooks? We will need to support a mix of self hosted Prefect Server instances and Prefect Cloud usage so we would like a solution which supports both but it appears that Automations will not be available via Prefect Server.Hugo Kitano
07/20/2021, 7:20 PMDave Nielsen
07/20/2021, 7:42 PMThomas Nakamoto
07/20/2021, 8:20 PMdef post_to_slack(task, old_state, new_state):
# <https://github.com/PrefectHQ/prefect/blob/d61fa6aac9330c5817cc8e8b8f8cca2d634ea7e1/src/prefect/engine/state.py>
if new_state.is_retrying():
msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
<http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
if new_state.is_failed():
msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
<http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
# if new_state.is_successful():
# msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
# <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
return new_state
I would like to avoid sending a success message for each task, and instead send one only if every task ran successfully. Thanks!Cliff Wells
07/20/2021, 8:34 PMJustin Liu
07/20/2021, 8:41 PMFailed to load and execute Flow's environment: ValidationError({'type': ['Unsupported value: Git']})
. This is our code, works with GitHub storage
import prefect
from prefect.storage import Git
from prefect.run_configs import ECSRun, LocalRun
from prefect import task, Flow, Parameter
from prefect.client import Secret
RUN_CONFIG = ECSRun(image='image/image',
cpu='1 vcpu', memory='2 GB')
STORAGE = Git(repo='name/repo', flow_path='path_to_this_file', git_token_secret_name='token')
@task
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hi")
with Flow("git-storage", storage=STORAGE, run_config=RUN_CONFIG) as flow:
say_hello()
Ben Muller
07/20/2021, 9:32 PMEric Mauser
07/20/2021, 10:04 PM