Shruti Hande
04/17/2023, 10:41 AM09:09:17.306 | DEBUG | httpx - load_ssl_context verify=True cert=None trust_env=True http2=False
09:09:17.306 | DEBUG | httpx - load_verify_locations cafile='/root/codes/the-swift-turtle/venv/lib/python3.8/site-packages/certifi/cacert.pem'
09:09:17.317 | DEBUG | httpcore - connection.connect_tcp.started host='<http://orion.my_url.com|orion.my_url.com>' port=4200 local_address=None timeout=30.0
09:09:17.321 | DEBUG | httpcore - connection.connect_tcp.complete return_value=<httpcore.backends.asyncio.AsyncIOStream object at 0x7fe97444aca0>
09:09:17.321 | DEBUG | httpcore - http11.send_request_headers.started request=<Request [b'GET']>
09:09:17.322 | DEBUG | httpcore - http11.send_request_headers.complete
09:09:17.322 | DEBUG | httpcore - http11.send_request_body.started request=<Request [b'GET']>
09:09:17.322 | DEBUG | httpcore - http11.send_request_body.complete
09:09:17.322 | DEBUG | httpcore - http11.receive_response_headers.started request=<Request [b'GET']>
09:09:17.333 | DEBUG | httpcore - http11.receive_response_headers.complete return_value=(b'HTTP/1.1', 404, b'Not Found', [(b'date', b'Mon, 17 Apr 2023 09:09:16 GMT'), (b'server', b'uvicorn'), (b'content-length', b'33'), (b'content-type', b'application/json')])
09:09:17.333 | DEBUG | httpx - HTTP Request: GET <http://orion.my><http://_url.com:4200/api/deployments/name/mp-meta-data-flow/mpdeployment-1|_url.com:4200/api/deployments/name/mp-meta-data-flow/mpdeployment-1> "HTTP/1.1 404 Not Found"advarisk
09:09:17.334 | DEBUG | httpcore - http11.receive_response_body.started request=<Request [b'GET']>
09:09:17.334 | DEBUG | httpcore - http11.receive_response_body.complete
09:09:17.334 | DEBUG | httpcore - http11.response_closed.started
09:09:17.334 | DEBUG | httpcore - http11.response_closed.complete
09:09:17.335 | DEBUG | httpcore - connection.close.started
09:09:17.335 | DEBUG | httpcore - connection.close.complete
#prefect-community #prefect-ui #prefect-contributorsEmma Rizzi
04/17/2023, 1:08 PMpush:True
to a private registry ?
Right now, my flow is built and push, the flow.py
is inside the docker image, and I tried to configure the pull step with prefect.projects.steps.set_working_directory
but I get
FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flow'
It seems that the pull step is executed outside of the docker container, so what should the pull step be for a project building a docker image containing all the files needed ?Felipe Ventura
04/17/2023, 2:21 PMEmil Ordoñez
04/17/2023, 3:00 PM2023-04-16 08:09:12,613 - distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 5
2023-04-16 08:09:49,807 - distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 6
2023-04-16 08:10:29,194 - distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 7
2023-04-16 08:21:04,785 - distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 43
2023-04-16 08:21:08,311 - distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 12
2023-04-16 08:21:10,879 - distributed.scheduler - WARNING - Worker tried to connect with a duplicate name: 14
I think the previous one is the most explainatory error, as it is signaling that maybe prefect-dask si repeating worker names, this may be causing Dask Worker not registering on the Scheduler and then all those failed to register Workers didn't stop until I saw them and I stopped them manually.
I'm getting this messages in the workers:
2023-04-16 08:09:12,614 - distributed.worker - ERROR - Unable to connect to scheduler: name taken, 5
2023-04-16 08:09:12,614 - distributed.worker - INFO - Stopping worker at <tcp://172.31.39.118:34983>. Reason: worker-close
2023-04-16 08:10:11,023 - distributed.nanny - INFO - Closing Nanny at '<tcp://172.31.39.118:44953>'. Reason: nanny-close
but they're not ending, I have to stop them manually.
I've just discovered those Warnings on the Scheduler, so that may give us a pretty good hint to the actual cause of the issue.
I'm using:
prefect-dask==0.2.3
dask-cloudprovider[aws]==2022.10.0
prefect version is 2.8.7datamongus
04/17/2023, 3:11 PMFederico Zambelli
04/17/2023, 3:21 PMRuntimeError: There is no current event loop in thread 'WorkerThread-0'
. I understood that Prefect runs its own event loop in the main thread, and flows are executed in a separate worker thread.
I can't seem to figure out a work around for the issue.
For the record, my code (tldr) is as such:
@task
def get_submission_from_ids(ids):
reddit = asyncpraw.Reddit(...)
return <http://reddit.info|reddit.info>(fullnames=ids)
@flow
async def get_data():
sub_gen = get_submission_from_ids([...]) # this is an async generator
submissions = [sub async for sub in sub_gen]
if __name__ == '__main__':
asyncio.run(get_data())
Slackbot
04/17/2023, 3:32 PMTanishq Hooda
04/17/2023, 4:47 PMDavid Anderson
04/17/2023, 5:04 PMMatthew Scanlon
04/17/2023, 8:01 PMDownloading flow code from storage at '<deployment name>'
?
I have a rather complex flow that crashes, but instead of exiting hangs and this is the last (most recent) thing i can see in the logs. This usually occurs at the beginning of the flow though.
Today however, I saw this on a simple flow that failed because of a kwarg error when passing a value to a pydantic model. Any thoughts on why this would be occurring twice? Any insights you could point me to on what is actually going on under the hood may help debug what is actually going on hereChoenden Kyirong
04/17/2023, 9:37 PMS3
to persist results of certain tasks due to them producing large amounts of data.
However, i’m running into an issue when trying to use: _`persist_result=True` and result_storage_=S3(bucket_path=<bucket_path>)
. I’m following along the documentation and doing the following:
...
from prefect.filesystems import S3
...
@task(name="<task_name>", persist_result=True,
result_storage_key="<key_name>")
def task_1(...):
...
@flow(task_runner=DaskTaskRunner, persist_result=True,
result_storage=S3(bucket_path="<bucket_path>"),
name="<name>")
def flow(...):
...
However, i’m running into this error when task_1
is running:
Crash detected! Execution was interrupted by an unexpected exception: botocore.exceptions.NoCredentialsError: Unable to locate credentials
I’m wondering if i’m using this correctly? I have registered the S3
block with the proper credentials and the documented example does not show a need to load
the block and just uses it like the above. It seems as though the call to S3
is unable to identify or locate any credentials. Any advice or help would be greatly appreciated!Choenden Kyirong
04/17/2023, 10:33 PMresult_storage="block_type/block_name"
(the storage block slug) instead of result_storage=S3(...)
as described in the documentation.
@flow(task_runner=DaskTaskRunner, persist_result=True,
result_storage="block_type/block_name")
It now correctly persists results into S3. I wonder if this is an error in the documentation or I was incorrectly following/using the approach described in the docs.Simon Rascovsky
04/18/2023, 1:46 AMvariables
functionality in Prefect 2.10.4, and I have a (stupid?) question…. How do you set them from inside a task of flow with Python? I can’t find any reference to setting variables, only retrieving them, and the only ways to set them seem to be through the UI or the REST API. Is this correct? Or am I missing something obvious?flapili
04/18/2023, 7:45 AMober
04/18/2023, 7:45 AMdef node_a():
var = f" node_a "
return var
@task
def start():
return node_a()
@task
def single():
return node_a()
@task
def node_b(i):
var = f"{i} node_b "
return var
@task
def node_c(i, j):
var = f"{i}-{j} node_c "
return var
@flow
def part() :
x = single()
a = start()
b = node_b(a)
c = node_c.submit(x, b, wait_for=[x, b])
return
@flow(name="complex_dag")
def main():
ret = part()
print(ret)
ober
04/18/2023, 7:47 AMKyle Hoffman
04/18/2023, 11:50 AMTomas Moreno
04/18/2023, 2:41 PMFailed to load and execute Flow's environment: GithubException(500, None, {'server': '<http://GitHub.com|GitHub.com>', 'date': 'Tue, 18 Apr 2023 09:31:20 GMT', 'content-type': 'application/json; charset=utf-8', 'content-length': '0', 'x-oauth-scopes': 'repo', 'x-accepted-oauth-scopes': '', 'x-github-media-type': 'github.v3; format=json', 'x-github-api-version-selected': '2022-11-28', 'x-ratelimit-limit': '5000', 'x-ratelimit-remaining': '4983', 'x-ratelimit-reset': '1681812079', 'x-ratelimit-used': '17', 'x-ratelimit-resource': 'core', 'access-control-expose-headers': 'ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, X-GitHub-SSO, X-GitHub-Request-Id, Deprecation, Sunset', 'access-control-allow-origin': '*', 'strict-transport-security': 'max-age=31536000; includeSubdomains; preload', 'x-frame-options': 'deny', 'x-content-type-options': 'nosniff', 'x-xss-protection': '0', 'referrer-policy': 'origin-when-cross-origin, strict-origin-when-cross-origin', 'content-security-policy': "default-src 'none'", 'vary': 'Accept-Encoding, Accept, X-Requested-With', 'x-github-request-id': '95E0:996D:446E:471F:643E6367'})
Is there any good way to retry these failures? if we put a flow level retry but the flow fails to download, is prefect aware of the retries in the flow?Jason Motley
04/18/2023, 2:47 PMMatt Lampe
04/18/2023, 3:22 PMraise LOOP(message=f"Status Run {n}={v_run_code}", result=dict(n=n + 1, code=v_run_code))
Andrew Moore
04/18/2023, 3:34 PMflapili
04/18/2023, 3:42 PMMax Eggers
04/18/2023, 3:58 PMToby Drane
04/18/2023, 4:46 PMSlackbot
04/18/2023, 4:51 PMIgs
04/18/2023, 5:12 PMMatheus Rocha
04/18/2023, 5:30 PMA >> [B, C]
[B, C] >> D
E
Abhinav Chordia
04/18/2023, 5:50 PMZachary Loertscher
04/18/2023, 5:50 PM"n_workers"
kwarg. I've tried "n_workers":8
and "n_workers":1
, but they both kick off all 14 of my tasks at the same time
What I want to do is the following (worked in prefect 1), but "scheduler ":"threads" is not accepted as a kwarg.
task_runner=DaskTaskRunner( #allows for parallel execution
cluster_kwargs={
"n_workers": 8,
"scheduler": 'threads'
}
For context, I need it to only run 8 tasks at a time, not all 14. If it runs all of them, it crashes the CPU on my EC2. I'm pouring over the docs but I'm not seeing anything on thisscott
04/18/2023, 6:59 PM