Wai Kiat Tan
06/11/2021, 4:51 PMJoe
06/11/2021, 7:34 PMLucas Fobian
06/11/2021, 8:06 PMYD
06/11/2021, 9:16 PMYD
06/11/2021, 11:23 PMBen Muller
06/11/2021, 11:26 PMThomas Nyegaard-Signori
06/14/2021, 10:01 AMStarted following logs for <task pod>
and then nothing more other than the completion and deletion log. Meanwhile, if I check the kubernetes job pod logs (kubectl logs -n my-namespace prefect-job-...
I see the logs of the task pod showing up nicely, just not being sent back to the the Agent/Server. Anyone have a similar experience and know of that one flag I'm missing?joshua mclellan
06/14/2021, 3:17 PMFina Silva-Santisteban
06/14/2021, 6:30 PMflow.storage = GitHub(secrets=["GITHUB_ACCESS_TOKEN"],
repo="my_org/repo_name",
path="prefect_flows/flows/flow.py",
ref="trunk")
The access token is saved in prefect cloud under the same name. During the flow run prefect can’t seem to be able to find the repo??
INFO:Downloading flow from GitHub storage - repo: 'my_org/repo_name', path: 'prefect_flows/flows/flow.py', ref: 'trunk'
ERROR: Repo 'my_org/repo_name' not found. Check that it exists (and is spelled correctly), and that you have configured the proper credentials for accessing it.
ERROR: Failed to load and execute Flow's environment: UnknownObjectException(404, {'message': 'Not Found', 'documentation_url': '<https://docs.github.com/rest/reference/repos#get-a-repository>'}, {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Fri, 11 Jun 2021 22:46:06 GMT', 'content-type': 'application/json; charset=utf-8', 'x-github-media-type': 'github.v3; format=json', 'access-control-expose-headers': 'ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, Deprecation, Sunset', 'access-control-allow-origin': '*', 'strict-transport-security': 'max-age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0', 'referrer-policy': 'origin-when-cross-origin, strict-origin-when-cross-origin', 'content-security-policy': "default-src 'none'", 'vary': 'Accept-Encoding, Accept, X-Requested-With', 'content-encoding': 'gzip', 'x-ratelimit-limit': '60', 'x-ratelimit-remaining': '59', 'x-ratelimit-reset': '1623455165', 'x-ratelimit-resource': 'core', 'x-ratelimit-used': '1', 'content-length': '112', 'x-github-request-id': 'C202:57D9:E1FC5:1D69CC:60C3E7AD'})
I’ve curled the github api endpoint repos/
directly using the same auth token and I do get the repo’s information. Pls advise! 🙏Jeremy Phelps
06/14/2021, 7:26 PMTask.run
method? I'd like to do something like this:
class MyTask(prefect.tasks.core.function.FunctionTask):
def run(*args, **kwargs):
try:
return self.super().run(*args, **kwargs)
except BaseException as e:
prefect.context.get('logger').error('Error on host {}: {}'.format(hostname, format_error(e)))
raise # Or better yet, mark the task as Failed
By default, Prefect catches, logs, and marks the task as failed for some errors, but for others, it puts str(e)
in the "State Message" and throws away the stack trace, and for still others, it drops the error completely and the task appears to be "running". My wrapper would fix these problems.
But you can't write the method above because there's a function called _validate_run_signature
that expressly forbids it. So the only way to do it that I can think of would be to either wait for Python to get macros, or to add code generation to my deployment process (it would insert the try/except block into the body of every function that has a @task
decorator).
Is it really impossible to write this wrapper as an ordinary method?Verun Rahimtoola
06/14/2021, 9:06 PMError while deploying flow: ValidationError({'type': ['Unsupported value: UniversalRun']
with prefect version 0.14.21
Raúl Mansilla
06/14/2021, 9:12 PMpeter zhao
06/14/2021, 9:43 PMpeter zhao
06/14/2021, 9:44 PMpeter zhao
06/14/2021, 9:45 PMrequests.exceptions.SSLError: HTTPSConnectionPool(host='<http://opensky-network.org|opensky-network.org>', port=443): Max retries exceeded with url: /api/states/all?lamin=37.15294209056667&lamax=40.75094679823332&lomin=-79.76136868215879&lomax=-75.13474242904122 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1129)')))
Scott Vermillion
06/15/2021, 12:36 AMERROR - <name> | Failed to query for flow run metadata
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/agent/agent.py", line 324, in _submit_deploy_flow_run_jobs
flow_runs = self._get_flow_run_metadata(flow_run_ids)
File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/agent/agent.py", line 684, in _get_flow_run_metadata
result = self.client.graphql(query)
File "/home/ubuntu/.local/lib/python3.8/site-packages/prefect/client/client.py", line 319, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['flow_run', 0, 'id'], 'message': 'Cannot return null for non-nullable field flow_run.id.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
I rebuilt my agent (was planning to anyway) but same error right away upon launching. I deleted and reinitialized my project. It doesn’t seem like I’m doing anything differently than before when the agent appeared happy but my issue was further down the line. Really mystified right now, but I’m a bit frazzled from working it so many hours…maybe I’m missing something obvious? (I’m also not the least bit experienced with Prefect - nor very much with AirFlow.)Prabin Mehta
06/15/2021, 12:00 PMThomas Nyegaard-Signori
06/15/2021, 12:50 PMazure
storage? I had a look at comparing the s3
implementation (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/storage/s3.py#L159) to the azure
one (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/storage/azure.py#L128), and it seems like the stored_as_script
flag doesnt do much on the azure
storage. Can anyone from the Prefect team confirm what I am seeing or am I on the wrong track?Tadas
06/15/2021, 1:30 PMMilly gupta
06/15/2021, 1:31 PMWai Kiat Tan
06/15/2021, 3:32 PMciaran
06/15/2021, 3:48 PMprefect agent kubernetes
in a custom Docker Image?
I can get my image running on AWS Fargate with:
command=[
"prefect",
"agent",
"ecs",
"start",
"--agent-address",
"http://:8080",
"--cluster",
cluster.cluster_arn,
"--task-role-arn",
ecs_task_role.role_arn,
],
But on AKS it's a whole different story, I can't get it running 😭 Essentially it can't find prefect but I can run the container locally and invoke prefectZach Schumacher
06/15/2021, 5:04 PMZach Schumacher
06/15/2021, 5:46 PMflow.schedule = CronSchedule("30 12 * * *")
Kathryn Klarich
06/15/2021, 7:59 PMJustin Liu
06/15/2021, 8:25 PMMatthew Blau
06/15/2021, 8:30 PMJason Prado
06/15/2021, 10:57 PMBen Muller
06/16/2021, 7:21 AMTimo
06/16/2021, 7:21 AMTimo
06/16/2021, 7:21 AMKevin Kho
06/16/2021, 1:36 PMTimo
06/16/2021, 2:17 PMfrom prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import State, Success
def stateh(obj, old, new: State):
if new.is_failed():
raise ENDRUN(new)
return new
@task
def say_hello(name):
print(f"Hello {name}")
@task(state_handlers=[stateh])
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
with Flow("hello_flow") as flow:
ls = [1, 2, 3]
sh = say_hello("John")
e = forerror.map(ls)
if __name__ == "__main__":
flow.run()
Kevin Kho
06/16/2021, 2:18 PMENDRUN
instead of fail when para == 2
Timo
06/16/2021, 2:21 PMfrom prefect import task, Flow
from prefect.engine.signals import FAIL, ENDRUN
from prefect.engine.state import Failed, State
def stateh(obj, old, new: State):
if new.is_failed():
raise ENDRUN(new)
return new
@task
def say_hello(name):
print(f"Hello {name}")
# @task(state_handlers=[stateh])
@task
def forerror(para):
if para == 2:
# raise FAIL("it's 2")
state = Failed("it's 2")
raise ENDRUN(state)
else:
print(para)
return para + 1
with Flow("hello_flow") as flow:
ls = [1, 2, 3]
sh = say_hello("John")
e = forerror.map(ls)
if __name__ == "__main__":
flow.run()
[2021-06-16 16:20:11+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'hello_flow'
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-06-16 16:20:11+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Scheduled to Running
[2021-06-16 16:20:11+0200] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Pending to Running
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Calling task.run() method...
Hello John
[2021-06-16 16:20:11+0200] DEBUG - prefect.TaskRunner | Task 'say_hello': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror': Handling state change from Pending to Mapped
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror': Finished task run for task with final state: 'Mapped'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Calling task.run() method...
1
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[0]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[0]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Calling task.run() method...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[1]': Handling state change from Running to Failed
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[1]': Finished task run for task with final state: 'Failed'
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Starting task run...
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Pending to Running
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Calling task.run() method...
3
[2021-06-16 16:20:12+0200] DEBUG - prefect.TaskRunner | Task 'forerror[2]': Handling state change from Running to Success
[2021-06-16 16:20:12+0200] INFO - prefect.TaskRunner | Task 'forerror[2]': Finished task run for task with final state: 'Success'
[2021-06-16 16:20:12+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
[2021-06-16 16:20:12+0200] DEBUG - prefect.FlowRunner | Flow 'hello_flow': Handling state change from Running to Failed
Kevin Kho
06/16/2021, 2:22 PMTimo
06/17/2021, 5:39 AMls = get_list()
instead of ls = [1,2,3]
). I receive a Task is not iterable
error if I use the output of the get_list() task
from prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
<http://LOGGER.info|LOGGER.info>(f"Hello {name}")
return name
@task
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
@task
def get_list():
return [1, 2, 3]
with Flow("hello_flow") as flow:
# ls = get_list()
ls = [1, 2, 3]
h_tasks = [say_hello("John") for x in ls]
e_tasks = [forerror(para=x) for x in ls]
for i in range(0, len(ls)):
e_tasks[i].set_upstream(h_tasks[i])
if i > 0:
e_tasks[i].set_upstream(e_tasks[i - 1])
if __name__ == "__main__":
flow.run()
Kevin Kho
06/17/2021, 1:53 PMfrom prefect import task, Flow
from prefect.engine.signals import FAIL
import prefect
LOGGER = prefect.context.get("logger")
@task
def say_hello(name):
<http://LOGGER.info|LOGGER.info>(f"Hello {name}")
return name
@task
def forerror(para):
if para == 2:
raise FAIL("it's 2")
else:
print(para)
return para + 1
@task
def get_list():
return [1, 2, 3]
@task
def helper(ls):
for x in ls:
say_hello.run("John")
y = forerror.run(para=x)
return y
with Flow("hello_flow") as flow:
ls = get_list()
helper(ls)
flow.run()
Timo
06/18/2021, 5:56 AMrun()
(which totally makes sense because it's all python)... Downside of this approach is, that I can't monitor each "sub" task as you said.
Therefore implementing Task Looping would be great. Could I reuse existing "offical" Prefect tasks within a Task Looping constuct? The example at the docs shows only custom tasks (with the @task decorator) (raising the LOOP signal).
A another question I have: Could I use the map function with each or one of the constructs? E.g. I have a list of files which is splitted by days ([[file1-day1.zip,file2-day1.zip,....,fileN-day1.zip], [file1-day2.zip,file2-day2.zip,....,fileN-day2.zip]]
. Now I like to iterate over the sequence but within the sequence I like to use map to extract all zip files with the Unzip-task. Currently I got ValueError: Could not infer an active Flow context.
.
As I discovered I could use StartFlowRun
to start another flow which implements the mapping routine. But this could be not tested locally as StartFlowRun
only works with cloud or server.