Michael Warnock
07/17/2021, 4:12 PMfeature-generator
which contains both worker/orchestration logic and the code for doing the work. I added task and flow definitions to it, but with github storage, the flow can't find the other modules in that repo (I've seen https://github.com/PrefectHQ/prefect/discussions/4776 and understand this is intentional).
My question is how best to structure things so that my flow can use that repo's code, but also execute a parameterized run from feature-generator
, on commit, through CI (because that's how we start the job right now). Obviously, I can make feature-generator
a package and depend on it from a new flows
repo, but to have feature-generator
start the run would create a circular dependency. Would you split it into three repos, with one of them just being responsible for executing the flow? I don't love that idea, but maybe that's best practices?Michael Warnock
07/17/2021, 4:21 PMhaven
07/18/2021, 6:00 AMKubernetesSecret
should not rely on any upstream task/parameters. However, I think it might not be a great assumption. I'm using an env
parameter that denotes "DEV"
or "PROD"
and then would decide what secret I want to retrieve from k8s later (usually a database secret). Any plan to allow KubernetesSecret
to be a dynamic task and interpreted naturally in the flow without having to manually use flow.set_dependencies
?Austen Bouza
07/18/2021, 5:59 PMDictCursor
with the existing SnowflakeQuery
task? The source shows
try:
with conn:
with conn.cursor() as cursor:
executed = cursor.execute(query, params=data).fetchall()
conn.close()
return executed
while what I would want to use is:
try:
with conn:
with conn.cursor(DictCursor) as cursor:
executed = cursor.execute(query, params=data).fetchall()
conn.close()
return executed
Has anyone else dealt with this before? I’d like to simply inject the DictCursor class if possible, but from the way this block of code is written it looks like the only way to do it is to subclass SnowflakeQuery
and overwrite the entire run
method.Seth Coussens
07/18/2021, 11:03 PMprefect agent docker start --token <token>--name "EMG-DCK-01" --volume company:/mnt/company --label production --log-level DEBUG --show-flow-logs
Any flow using this agent then shows that directory as empty.
When I manually run the container (on the same account, terminal windows, etc as the agent) and mount it with -v using the same syntax it tests out fine and the volume has the expected files:
docker run -it -v company:/mnt/company node:lts bash
cd /mnt/company
ls
**shows all files**
Any ideas why the volume when mounted with the agent configuration would not be working?
This is what the company volume configuration looks like:
PS C:\Users\azure.automation> docker volume inspect company
[
{
"CreatedAt": "2021-07-17T22:43:31Z",
"Driver": "local",
"Labels": {},
"Mountpoint": "/var/lib/docker/volumes/company/_data",
"Name": "company",
"Options": {
"device": "//192.168.1.2/Company",
"o": "user=myusername,domain=mydomain,password=mypassword",
"type": "cifs"
},
"Scope": "local"
}
]
Zach Schumacher
07/19/2021, 2:53 PMKevin Kho
07/19/2021, 3:26 PMFabrice Toussaint
07/19/2021, 4:24 PMTraceback (most recent call last):
File "runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "flow_runner.py", line 569, in get_flow_run_state
executors.prepare_upstream_states_for_mapping(
File "\executors.py", line 537, in prepare_upstream_states_for_
mapping
value = upstream_state.result[i]
KeyError: 0
[2021-07-19 18:23:21+0200] ERROR - prefect.ETL | Unexpected error occured in FlowRunner: KeyError(0)
Hugo Shi
07/19/2021, 6:19 PMLeon Kozlowski
07/19/2021, 7:15 PMMatthew Alhonte
07/19/2021, 8:43 PMHarry Baker
07/19/2021, 8:47 PMErik Amundson
07/19/2021, 10:17 PMDaskExecutor
on GKE with cluster class dask_kubernetes.KubeCluster
and it seems to be dropping 1-2 mapped children per run. It's like the scheduler doesn't realize they exist, or is losing track somehow - there is no error message in the logs. This prevents the flow from proceeding to the downstream tasks, so I end up having to cancel the flow. So far it's shown the same behavior on all four test runs. If it matters, we're running on prefect 0.14.16.Hugo Kitano
07/19/2021, 10:21 PMNishtha Varshney
07/20/2021, 8:06 AMBruno Murino
07/20/2021, 11:02 AMAn Hoang
07/20/2021, 1:26 PM./test_prefect
is created but nothing is outputted. Here's my very simple code:
## test_prefect.py
result = LocalResult(dir="./test_prefect")
@task(target="func_task_target.txt", checkpoint=True, result = result)
def test(val):
return val
with Flow("simple_test",result=result) as test_flow:
result = test(99)
test_flow.run()
I did set PREFECT__FLOWS__CHECKPOINTING=true
before doing python test_prefect.py
Michael Warnock
07/20/2021, 2:38 PMError grabbing logs: unexpected EOF
<- Has anyone seen this in their (docker) agent output. It happened after two frames of stacktrace for me, on a mapped task (149 of 876), and the agent stopped processing anything (or sending anything to stdout anyway, and the rest of the tasks didn't run).
Grepping for "grabbing logs" turns up nothing in the prefect repo, our own code, or any of the dependencies in my virtualenv. It's a cloud-backed flow run; is this coming from cloud, maybe?An Hoang
07/20/2021, 2:58 PMLaura Vaida
07/20/2021, 3:08 PMSean Talia
07/20/2021, 3:38 PMDaniel Burkhardt
07/20/2021, 3:45 PM{map_index}
in the Result location, and it seems like the output of first_map_fn[0]
always get' passed to second_map_fn[0]
even if first_map_fn[1]
finishes first. Is this guaranteed? Is there a better way to identify which Result is used as input to a specific task run?
Please find example flow in thread.Jared Noynaert
07/20/2021, 3:57 PMClemens
07/20/2021, 4:26 PMrun_always
trigger to run, well, always.
Example code in threadMehdi Nazari
07/20/2021, 4:41 PMprefect agent docker start --key "SERVICE_ACCOUNT_KEY" --name "AGENT_NAME"
I was also able to write a simple Flow and register as a docker docker storage.
Cloud UI also recognizes the Flow as a registered flow; but upon executing it, I’m getting below error on the Agent:
docker.errors.APIError: 400 Client Error for <http+docker://localhost/v1.40/containers/create?name=nickel-okapi>: Bad Request ("invalid IP address in add-host: "host-gateway"")
What am I missing? any troubleshooting I can do?Vincent
07/20/2021, 4:51 PMwait=True
for StartFlowRun
tasks, the result objects are not stored at the target store. I want to kick off a flow of flows, such that the StartFlowRun
tasks saves a result object to avoid resubmission of the flow. wait=False
restores the expected behavior. Are there any suggestions to have dependent flows, with checkpointing.Sean Harkins
07/20/2021, 5:14 PMCloud Hook
for the Flow success
event. This needs to be done programmatically via the api at registration time. The documentation states Psst! We have recently added Automations which offer more functionality than Cloud Hooks and will eventually replace them.
There does not seem to currently be programmatic api access to Automations.
1. Is there a way to register an Automation via an API and if so is it documented somewhere?
2. When is the planned deprecation of Cloud Hooks? We will need to support a mix of self hosted Prefect Server instances and Prefect Cloud usage so we would like a solution which supports both but it appears that Automations will not be available via Prefect Server.Hugo Kitano
07/20/2021, 7:20 PMDave Nielsen
07/20/2021, 7:42 PMThomas Nakamoto
07/20/2021, 8:20 PMdef post_to_slack(task, old_state, new_state):
# <https://github.com/PrefectHQ/prefect/blob/d61fa6aac9330c5817cc8e8b8f8cca2d634ea7e1/src/prefect/engine/state.py>
if new_state.is_retrying():
msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
<http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
if new_state.is_failed():
msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
<http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
# if new_state.is_successful():
# msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
# <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
return new_state
I would like to avoid sending a success message for each task, and instead send one only if every task ran successfully. Thanks!Thomas Nakamoto
07/20/2021, 8:20 PMdef post_to_slack(task, old_state, new_state):
# <https://github.com/PrefectHQ/prefect/blob/d61fa6aac9330c5817cc8e8b8f8cca2d634ea7e1/src/prefect/engine/state.py>
if new_state.is_retrying():
msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
<http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
if new_state.is_failed():
msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
<http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
# if new_state.is_successful():
# msg = f"<@XXXX> {os.path.basename(__file__)} | {task} | {new_state}"
# <http://requests.post|requests.post>("<https://hooks.slack.com/services/>", json={"text": msg})
return new_state
I would like to avoid sending a success message for each task, and instead send one only if every task ran successfully. Thanks!Ben Muller
07/20/2021, 8:29 PMThomas Nakamoto
07/20/2021, 8:31 PMKevin Kho
07/20/2021, 8:31 PMThomas Nakamoto
07/20/2021, 8:34 PMKevin Kho
07/20/2021, 8:37 PMThomas Nakamoto
07/20/2021, 8:41 PM