Hicham Benbriqa
02/11/2023, 9:23 PMRohit Motiani
02/12/2023, 10:52 PMShivan Trivedi
02/13/2023, 8:06 AMNick Williams
02/13/2023, 1:50 PMAustin Weisgrau
02/13/2023, 9:23 PMfrom flows.helloworld.helloworld_flow import helloworld
from prefect.deployments import Deployment
from prefect_aws.ecs import ECSTask
from prefect_aws.s3 import S3Bucket
s3_storage = S3Bucket.load("prod")
ecs_task = ECSTask.load("prod")
helloworld_deployment = Deployment.build_from_flow(
flow=helloworld,
name="Hello World",
storage=s3_storage,
path="helloworld",
infrastructure=ecs_task,
)
helloworld_deployment.apply()
is giving me
$ python deployments.py
Traceback (most recent call last):
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 112, in _error_wrapper
return await func(*args, **kwargs)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/aiobotocore/client.py", line 358, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (SignatureDoesNotMatch) when calling the PutObject operation: The request signature we calculated does not match the signature you provided. Check your key and signing method.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "~/code/wfp/wfp-prefect/deployments.py", line 29, in <module>
helloworld_deployment = WFPDeployment(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 230, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 181, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "~/.pyenv/versions/3.10.9/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "~/.pyenv/versions/3.10.9/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/deployments.py", line 755, in build_from_flow
await deployment.upload_to_storage(ignore_file=ignore_file)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/deployments.py", line 600, in upload_to_storage
file_count = await self.storage.put_directory(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/filesystems.py", line 492, in put_directory
return await self.filesystem.put_directory(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/filesystems.py", line 368, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 1101, in _put_file
await self._call_s3(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 339, in _call_s3
return await _error_wrapper(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 139, in _error_wrapper
raise err
PermissionError: The request signature we calculated does not match the signature you provided. Check your key and signing method.
Austin Weisgrau
02/13/2023, 10:51 PMTheodor Sjöstedt
02/14/2023, 9:31 AMschedule:
cron: '*/5 * * * *'
timezone: null
day_or: true
is_schedule_active: true
To have it run every 5 minutes, and then applied it.
When I run
prefect agent start -q 'test'
I see output from my deployment every 5 minutes, so its working, but as soon as I ctrc + C
to exit the output of the agent, it stops, and nothing is scheduled. How do I get the agent to run without my terminal? 🙂Theodor Sjöstedt
02/14/2023, 2:21 PMRonil
02/14/2023, 10:55 PMShivan Trivedi
02/16/2023, 5:10 PMAustin Weisgrau
02/16/2023, 11:40 PMprefect.get_run_logger()
is incompatible with testing task functions.
A minimal example based on the documentation for testing task functions. I only added the two lines defining and using the logger.
import pytest
from prefect import flow, get_run_logger, task
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@task
def my_favorite_task():
logger = get_run_logger()
<http://logger.info|logger.info>("running task")
return 42
@flow
def my_favorite_flow():
val = my_favorite_task()
return val
def test_my_favorite_task():
assert my_favorite_task.fn() == 42
Raises MissingContextError("There is no active flow or task run context.")
on the call to prefect.get_run_logger()
Sampath
02/20/2023, 6:06 PMAlejandro Armas
02/20/2023, 6:19 PMeli yosef
02/21/2023, 7:09 AMKalise Richmond
02/21/2023, 5:57 PMEthienne Marcelin
02/22/2023, 9:43 AMAustin Weisgrau
02/22/2023, 8:31 PM@task
decorator that would allow this.Dendi Handian
02/23/2023, 6:58 AMlocalhost:4200
Here is my code repository https://github.com/dendihandian/transfermarkt-prefect, it requires docker-compose and anyone can try it by executing docker-compose up -d
Ouail Bendidi
02/23/2023, 4:48 PMNimesh Kumar
02/24/2023, 12:24 PMef start_inferencing(my_param, file_path):
job_id = my_param
algo_id = "402"
res_1 = generate_uuid.submit(algo_id, job_id)
call_on_failure_if_failed(res_1)
res_2 = get_file.submit(file_path)
call_on_failure_if_failed(res_2)
res_4 = choose_valid_file.submit(prev_task=res_2)
call_on_failure_if_failed(res_4)
res_5 = prepare_request_upload.submit(prev_task=res_4)
call_on_failure_if_failed(res_5)
res_6 = send_data_request.submit(prev_task=res_5)
call_on_failure_if_failed(res_6)
res_7 = prepare_predict_request.submit(prev_task=res_6)
call_on_failure_if_failed(res_7)
res_8 = send_predict_request.submit(prev_task=res_7)
call_on_failure_if_failed(res_8)
res_9 = extract_lunit_jobid.submit(prev_task=res_8)
call_on_failure_if_failed(res_9)
res_10 = prepare_fetch.submit(prev_task=res_9)
call_on_failure_if_failed(res_10)
res_11 = fetch_request.submit(prev_task=res_10)
call_on_failure_if_failed(res_11)
if res_11:
res_12 = convert_output.submit(prev_task=res_11)
call_on_failure_if_failed(res_12)
res_13 = zip_file.submit(prev_task=res_12)
call_on_failure_if_failed(res_13)
res_14 = send_to_HGW.submit(prev_task=res_13)
call_on_failure_if_failed(res_14)
else:
pass
if __name__ == "main":
start_inferencing(parameters=dict_)
i want to reun convert_output, zip_file, send to hgw, only when condition is true, else exit.Wellington Braga
02/24/2023, 2:37 PMprefect deployment ls --flow-name <flow-name>
?Vera Zabeida
02/24/2023, 5:13 PMname: Ad Hoc Deployment
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
on:
push:
branches:
- main
jobs:
ad-hoc-deployment:
name: Build and apply deployment
runs-on: ubuntu-latest
timeout-minutes: 45
steps:
- uses: actions/checkout@v3
with:
persist-credentials: false
fetch-depth: 0
- name: Set up Python 3.11.1
uses: actions/setup-python@v4
with:
python-version: "3.11.1"
cache: "pip"
cache-dependency-path: "requirements*.txt"
- name: Install packages
run: |
python -m pip install --upgrade pip
pip install --upgrade --upgrade-strategy eager -r requirements.txt
- name: Build and apply deployment
run: |
prefect deployment build ./flows/pipeline.py:hello \
--name "Ad Hoc Deployment" \
--params '{"user_input": "github action"}' \
--version $GITHUB_SHA \
--tag ad-hoc-gh \
--work-queue github \
--apply
In the video they set up a --storage-block
and use github as a storage block, and use that on the prefect deployment build command, but I'm wondering why that'd be needed. In the step above, we're checking out the repo so for me that would mean that we have all of the code already in this job, so no github storage is needed?
I've had one build that succeeded with this setup, but I didn't see any deployments in the UI, double checked my env vars, and since then getting this error message that isn't very helpful, here's the repo where you can see it:
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '***/flows/'
Response: {'detail': 'Not Found'}
I'm just trying to get a minimal setup going, and understand what's needed and not needed for that. TIA for any pointers!Emma Keil-Vine
02/24/2023, 7:17 PMCannot create flow run. Failed to reach API at <https://api.prefect.cloud/api/accounts/a9c0f124-ca06-4646-a501-57a405ebf3c7/workspaces/43f1b8a7-ed9c-46d2-a88a-55eef95b8ef7/>.
-- does anyone have any recommendations?Austin Weisgrau
02/24/2023, 9:08 PMtask.map()
is useful for setting up a set of tasks to run in parallel.
Is it possible to set up a lot of tasks to run in parallel, with some substructure? E.G. a task structure that looks like this imageAustin Weisgrau
02/24/2023, 10:11 PMprefect.get_run_logger()
raises MissingContextError
when called in a method called by concurrent.futures.ThreadPoolExecutor
. Is this a bug or is it a prefect anti-pattern to use ThreadPoolExecutor within a prefect task?
from concurrent.futures import ThreadPoolExecutor
From prefect import task, flow, get_run_logger
def concurrent_subtask() -> None:
# This raises MissingContextError
get_run_logger()
@task
def basic_task():
get_run_logger().info("This works.")
with ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for _ in range(2):
future = executor.submit(concurrent_subtask)
futures.append(future)
for future in futures:
future.result()
@flow(name="Hello World")
def helloworld() -> None:
basic_task()
Maryam Veisi
02/27/2023, 7:29 AMDendi Handian
02/27/2023, 6:15 PMNimesh Kumar
02/28/2023, 11:40 AMAn error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied
Can anyone please help me on this, is there any permission i am missing hereOfir
03/01/2023, 7:14 PMpandas
or numpy
or any other 3rd party dependency from my workflow, what is the best practice to do that?
The agent running my deployment/workflow might not have these packages in place, right?
Should I build a custom agent Dockerfile with all of the dependencies, or is there a better approach to it? what are the tradeoffs between the different solutions? (edited)Ofir
03/02/2023, 11:47 PMPOST /model
REST API call to the Node.js server which then in turn calls Prefect 2.0 using the REST API to start a new run (of an existing deployment).
The workflow will train the model, run inference on some data and then persist the output to a database.
This is obviously an asynchronous operation that may take a few minutes (or more) to complete.
Assuming the workflow succeeded, I would like to notify the users (those who have Chrome opened to my web app) that something has happened, i.e. training completed.
How should the Node.js be notified when the flow has finished (either successfully / failed)?
Is there like a middleware / RabbitMQ / other message queue that the Node.js app can subscribe to, onto which Prefect publishes event?
If not, does Prefect expose other broadcast events? And if not, should I poll periodically from my app and maintain state diff?
Thanks!