Tomás Emilio Silva Ebensperger
06/11/2021, 1:32 AMprefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
Michael Hadorn
06/11/2021, 9:06 AMLukas N.
06/11/2021, 9:30 AM4/10
iterations of the task, the result contains the output of the 4 iteration which is also the input to the 5th one. If the 5th one fails (in my case the process dies, stops sending 💓 ) the ZombieKiller restarts the task, but it ignores the result and starts from the 1st iteration! This doesn't happen if I restart it from the UI, there it correctly picks up the result and continues with 5th iteration.Andrew Moist
06/11/2021, 4:04 PMCM
06/11/2021, 8:29 PMAyla Khan
06/14/2021, 6:18 PMLocalDaskExecutor
? I tried running this code but haven't seen any output from Dask in Prefect Cloud:
with Flow(
"simple_aggregate_flow",
executor=LocalDaskExecutor(
scheduler="processes", **{
"distributed.logging.distributed": "debug",
}),
) as simple_aggregate_flow:
...
Hugo Kitano
06/14/2021, 6:46 PMPawel
06/14/2021, 7:17 PMfrom prefect.engine.results import GCSResult
from prefect import task, Flow
@task(log_stdout=True, result=GCSResult(bucket='prefect-cloud-results', location="{flow_name}/{flow_run_id}/{task_slug}/{task_run_id}_dummybenchmark.pkl"))
def compute_benchmark() -> float:
return 0.753
with Flow("test_gcs") as flow:
r = compute_benchmark()
x=flow.run()
When running in Prefect Cloud, the result is written to the specified GCS location and logs show as much, but when running the same flow on my local machine, the flow runs successfully but no attempt is made to write to GCS. It's not an access issue as I can manually call GCSResult.write(...) with success. Is the GCSResult decorator only expected to work from Prefect Cloud runs or am I missing some setting?ash
06/14/2021, 9:56 PM@task
def say_hello():
print("hello world !!!")
with Flow("hello world") as flow:
say_hello()
flow.storage = Docker(registry_url = "", image name = "hello world flow")
flow.register("demo")
Can someone please explain to me
1.) Why we used container registry for storing a flow and what difference would it make compared to other script based storage like github.
2.) When we execute a flow whose server is present on K8 but no executer is defined like in above snippet, where would the flow be executed on local machine or on a pod on the cluster.Mehdi Nazari
06/14/2021, 10:47 PMEmma Rizzi
06/15/2021, 8:04 AMTom Forbes
06/15/2021, 1:11 PMif running_locally:
flow.executor = LocalDaskExecutor()
else:
flow.executor = DaskExecutor(cluster_class=lambda: KubeCluster(pod_template=....))
obviously the second DaskExecutor won’t work locally, which is a shame. I can’t find any supported way of checking if we’re serializing this for production rather than debugging it locallyTom Forbes
06/15/2021, 2:54 PMResult
class/instance for a given task within the task itself?Tom Forbes
06/15/2021, 5:16 PMflow.run()
is it possible to override the result class/type? Doing:
flow.run(result=LocalResult(...))
fails with:
TypeError: run() got an unexpected keyword argument 'result'
Raúl Mansilla
06/16/2021, 4:32 PMPaulo Benatto
06/17/2021, 8:59 AM@task
def create_task_list(ids):
l = []
for i in ids:
l.append(ReferralUID())
return l
@task
def generate_ids():
return ["id1", "id2", "id3"]
with Flow("parallel-execution") as flow:
ids = generate_ids()
list_of_tasks = create_task_list(ids)
request_referral = RequestReferralDetail()
# list_of_tasks should be a list, but i'm returning a task. how to inject a list on bind?
request_referral.bind(request_details=list_of_tasks, flow=flow)
flow.visualize()
flow.run(executor=LocalDaskExecutor())
ThanksChris Bowen
06/17/2021, 4:22 PMprefect server start --detach
so it doesn't run interactively.
Is there a comparable solution for agents? When my ssh session with the server ends, so does my agent.
I might just be missing something obvious. Appreciate any help.YD
06/17/2021, 7:25 PMnewuidmap
and newgidmap
also not so clear what exactly I should add to /etc/subuid
and /etc/subgid
if someone can help with a step by step example, it will be very helpful
thanksYD
06/17/2021, 7:42 PMShaoyi Zhang
06/17/2021, 8:07 PMYD
06/17/2021, 8:54 PMprefect server start
, it will stop when exit the terminal
I tried sudo systemctl prefect server start
, but this does not look like to proper wayYD
06/17/2021, 9:28 PM<VM ip address>:8080
, without running anything, it is somehow shows the workflows and projects I had on my local machine.
How can this be ?ash
06/18/2021, 12:08 PMimport pymongo
import pandas
from reports.config import mongo_config
import sklearn
from prefect import Flow
from prefect.storage import Docker
@task
def say_hello():
print("hello world !!!")
with Flow("hello world") as flow:
say_hello()
flow.storage = Docker(registry_url = , image name = "hello world flow")
flow.register("demo")
In above code I am importing three external libraries i.e pandas, sklearn, pymongo and mongo_config
which here contains configuration related information for connecting with mongo
When I register a flow, lets say for code above ,
A.) step(1) A docker image containing everything including external libraries, mongo config and flow code will be built and saved to container registry.
step(2) Its Metadata including a schedule if any , its path to dependencies from container registry etc will be saved on postgres.
step(3) When the kubernetes agent polls and have to run the above flow, it will create a pod and dependencies will be installed and after task completion pod is terminated.
Thats my understanding of how things are working, please correct me if am wrong on any of above.
Now one thing here is what if mongo config changes, whenever we built pipelines for reporting all we want to do is just change config at one place and changes are incorporated for every other report but going on with above approach , i might need to re-register every flow to let it engulf the updated config, Thats what i thinking over here, can you suggest someways of how can i change config at one place so that all the flows knows it and i don't have to re-register all my flows.
B.) One way that i think will be able to solve this is when I use github as storage as the code is read from github so the updated config will also be taken into consideration possibly but there is one issue in this approach ,
when the pod is created to run the script how the dependencies will be installed on the pod since we don't have docker image this time?
Kamil Gorszczyk
06/20/2021, 3:23 PMexecuteShell = ShellTask(task_run_name=lambda **kwargs: f"{kwargs['command'].split('-file ')[1].split(' -')[0].rpartition('/')[-1]}",
stream_output=True, max_retries=RETRY_MAX, retry_delay=timedelta(minutes=RETRY_TIMEOUT), timeout=TIMEOUT, state_handlers=[taskStateHandler])
def taskStateHandler(obj: Task, old_state: State, new_state: State):
try:
if new_state.is_retrying():
sendTelegramNotification("Task {0} failed and is retrying at {1}".format(obj.task_run_name, new_state.start_time))
return new_state
except:
sendTelegramNotification("Task {0} failed with an exception".format(obj.name))
return new_state
Since the task_run_name is a lambda function, I just called task.task_run_name() to get the resolved name. But somehow, since a week or so, prefect raises the following exception:
Exception raised while calling state handlers: KeyError('command')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/cloud/task_runner.py", line 64, in call_runner_target_handlers
new_state = super().call_runner_target_handlers(
File "/usr/local/lib/python3.8/dist-packages/prefect/engine/task_runner.py", line 114, in call_runner_target_handlers
new_state = handler(self.task, old_state, new_state) or new_state
File "prefect-master.py", line 200, in taskStateHandler
File "prefect-master.py", line 222, in <lambda>
KeyError: 'command'
I just can’t figure out what I’m missing or doing wrong… I even reverted to 0.14.19 where I thought it worked but same exception.Robert Hales
06/21/2021, 1:54 PMCM
06/21/2021, 3:57 PMMatheus Cruz
06/21/2021, 8:28 PMState Message:
Failed to load and execute Flow's environment: UnknownObjectException(404, {'message': 'Not Found', 'documentation_url': '<https://docs.github.com/rest/reference/repos#get-repository-content>'} , {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Mon, 21 Jun 2021 20:19:24 GMT', 'content-type': 'application/json;charset=utf-8', 'transfer -encoding': 'chunked', 'x-oauth-scopes': 'admin:enterprise, admin:gpg_key, admin:org, admin:org_hook, admin:public_key, admin:repo_hook, delete:packages, delete_repo, gist, notifications , repo, user, workflow, write:discussion, write:packages', 'x-accepted-oauth-scopes': '', 'x-github-media-type': 'github.v3; format=json', ' x-ratelimit-limit': '5000', 'x-ratelimit-remaining': '4988', 'x-ratelimit-reset': '1624309007', 'x-ratelimit-used': '12', 'x- ratelimit-resource': 'core', 'access-control-expose-headers': 'ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X- RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X -Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, Deprecation, Sunset', 'access-control-allow-origin': '*', 'strict-transport-security': 'max -age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0',.....
I've tried a lot of things and nothing seems to work, do you know what it can be?Aiden Price
06/22/2021, 2:48 AMRoey Brecher
06/22/2021, 3:42 PMDevin McCabe
06/22/2021, 3:59 PM42036d0b-6443-4648-8c93-50c2ac6a360c
)Devin McCabe
06/22/2021, 3:59 PM42036d0b-6443-4648-8c93-50c2ac6a360c
)nicholas
06/22/2021, 4:20 PMDevin McCabe
06/22/2021, 4:25 PMnicholas
06/22/2021, 4:27 PMstart_time
and end_time
- if the run is still in a Cancelling
state, it hasn't finished from what Cloud can tell, even if nothing is running. You can manually set this state to Finished
or Failed
to cap thisDevin McCabe
06/22/2021, 4:29 PMnicholas
06/22/2021, 4:30 PM