Scott Chamberlain
02/10/2023, 5:14 PMprefect-shell
(we’re currently moving from using shell_run_command
to ShellOperation
). These shell commands all spin up docker containers. Sometimes a container will hang without any logs for up to 24 hrs. The only thing we’ve found to work is “tickling” the process connected to the container (we’ve done e..g, tail /proc/<pid>/fd/2
), and then we get logs again. We haven’t seen this problem when we start docker containers outside of a prefect context, so we’re thinking it’s something to do with prefect. it is possible this will go away as we transition to ShellOperation
, but perhaps not. Curious if anyone has seen this behavior too? (related thread https://prefect-community.slack.com/archives/C048ZHT5U3U/p1675282219967509 )Jaime Raldua Veuthey
02/10/2023, 5:45 PMMathuraju Sivasankar
02/11/2023, 8:37 AMMathuraju Sivasankar
02/11/2023, 2:59 PMJuan David Lozano
02/11/2023, 8:21 PMcd prefect && python3 -m pip install .
I get an error that prefect needs python 3.7, I try to upgrade to python 3.7 but I think (the repo was made with a lower version of python? I could be wrong just starting to understand docker) is there a more updated version of this article? or somebody can point me on how to install prefect on google cloud?Hicham Benbriqa
02/11/2023, 9:20 PMprefect version
:
Version: 2.3.1
API version: 0.8.0
Python version: 3.9.7
Git commit: 1d485b1d
Built: Thu, Sep 1, 2022 3:53 PM
OS/Arch: linux/x86_64
Profile: default
Server type: <client error>
Hicham Benbriqa
02/11/2023, 9:22 PMfrom prefect import task, flow
@task
def a():
return 1+1
@flow
def flow1():
print(a())
if __name__ == '__main__':
flow1()
Hicham Benbriqa
02/11/2023, 9:22 PMThe above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "test-prefect.py", line 12, in <module>
flow1()
.....
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: block_spec
[SQL: ALTER TABLE block_spec RENAME TO block_schema]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
Hicham Benbriqa
02/11/2023, 9:23 PMRohit Motiani
02/12/2023, 10:52 PMShivan Trivedi
02/13/2023, 8:06 AMNick Williams
02/13/2023, 1:50 PMAustin Weisgrau
02/13/2023, 9:23 PMfrom flows.helloworld.helloworld_flow import helloworld
from prefect.deployments import Deployment
from prefect_aws.ecs import ECSTask
from prefect_aws.s3 import S3Bucket
s3_storage = S3Bucket.load("prod")
ecs_task = ECSTask.load("prod")
helloworld_deployment = Deployment.build_from_flow(
flow=helloworld,
name="Hello World",
storage=s3_storage,
path="helloworld",
infrastructure=ecs_task,
)
helloworld_deployment.apply()
is giving me
$ python deployments.py
Traceback (most recent call last):
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 112, in _error_wrapper
return await func(*args, **kwargs)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/aiobotocore/client.py", line 358, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (SignatureDoesNotMatch) when calling the PutObject operation: The request signature we calculated does not match the signature you provided. Check your key and signing method.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "~/code/wfp/wfp-prefect/deployments.py", line 29, in <module>
helloworld_deployment = WFPDeployment(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 230, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 181, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "~/.pyenv/versions/3.10.9/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "~/.pyenv/versions/3.10.9/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/deployments.py", line 755, in build_from_flow
await deployment.upload_to_storage(ignore_file=ignore_file)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/deployments.py", line 600, in upload_to_storage
file_count = await self.storage.put_directory(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/filesystems.py", line 492, in put_directory
return await self.filesystem.put_directory(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/prefect/filesystems.py", line 368, in put_directory
self.filesystem.put_file(f, fpath, overwrite=True)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/fsspec/asyn.py", line 114, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/fsspec/asyn.py", line 99, in sync
raise return_result
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/fsspec/asyn.py", line 54, in _runner
result[0] = await coro
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 1101, in _put_file
await self._call_s3(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 339, in _call_s3
return await _error_wrapper(
File "~/.pyenv/versions/prefect/lib/python3.10/site-packages/s3fs/core.py", line 139, in _error_wrapper
raise err
PermissionError: The request signature we calculated does not match the signature you provided. Check your key and signing method.
Austin Weisgrau
02/13/2023, 10:51 PMTheodor Sjöstedt
02/14/2023, 9:31 AMschedule:
cron: '*/5 * * * *'
timezone: null
day_or: true
is_schedule_active: true
To have it run every 5 minutes, and then applied it.
When I run
prefect agent start -q 'test'
I see output from my deployment every 5 minutes, so its working, but as soon as I ctrc + C
to exit the output of the agent, it stops, and nothing is scheduled. How do I get the agent to run without my terminal? 🙂Theodor Sjöstedt
02/14/2023, 2:21 PMAnil
02/14/2023, 10:55 PMShivan Trivedi
02/16/2023, 5:10 PMAustin Weisgrau
02/16/2023, 11:40 PMprefect.get_run_logger()
is incompatible with testing task functions.
A minimal example based on the documentation for testing task functions. I only added the two lines defining and using the logger.
import pytest
from prefect import flow, get_run_logger, task
from prefect.testing.utilities import prefect_test_harness
@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
yield
@task
def my_favorite_task():
logger = get_run_logger()
<http://logger.info|logger.info>("running task")
return 42
@flow
def my_favorite_flow():
val = my_favorite_task()
return val
def test_my_favorite_task():
assert my_favorite_task.fn() == 42
Raises MissingContextError("There is no active flow or task run context.")
on the call to prefect.get_run_logger()
Sampath Sukesh Ravolaparthi
02/20/2023, 6:06 PMAlejandro Armas
02/20/2023, 6:19 PMeli yosef
02/21/2023, 7:09 AMKalise Richmond
02/21/2023, 5:57 PMGuillaume Bertrand
02/22/2023, 9:43 AMAustin Weisgrau
02/22/2023, 8:31 PM@task
decorator that would allow this.Dendi Handian
02/23/2023, 6:58 AMlocalhost:4200
Here is my code repository https://github.com/dendihandian/transfermarkt-prefect, it requires docker-compose and anyone can try it by executing docker-compose up -d
Ouail Bendidi
02/23/2023, 4:48 PMNimesh Kumar
02/24/2023, 12:24 PMef start_inferencing(my_param, file_path):
job_id = my_param
algo_id = "402"
res_1 = generate_uuid.submit(algo_id, job_id)
call_on_failure_if_failed(res_1)
res_2 = get_file.submit(file_path)
call_on_failure_if_failed(res_2)
res_4 = choose_valid_file.submit(prev_task=res_2)
call_on_failure_if_failed(res_4)
res_5 = prepare_request_upload.submit(prev_task=res_4)
call_on_failure_if_failed(res_5)
res_6 = send_data_request.submit(prev_task=res_5)
call_on_failure_if_failed(res_6)
res_7 = prepare_predict_request.submit(prev_task=res_6)
call_on_failure_if_failed(res_7)
res_8 = send_predict_request.submit(prev_task=res_7)
call_on_failure_if_failed(res_8)
res_9 = extract_lunit_jobid.submit(prev_task=res_8)
call_on_failure_if_failed(res_9)
res_10 = prepare_fetch.submit(prev_task=res_9)
call_on_failure_if_failed(res_10)
res_11 = fetch_request.submit(prev_task=res_10)
call_on_failure_if_failed(res_11)
if res_11:
res_12 = convert_output.submit(prev_task=res_11)
call_on_failure_if_failed(res_12)
res_13 = zip_file.submit(prev_task=res_12)
call_on_failure_if_failed(res_13)
res_14 = send_to_HGW.submit(prev_task=res_13)
call_on_failure_if_failed(res_14)
else:
pass
if __name__ == "main":
start_inferencing(parameters=dict_)
i want to reun convert_output, zip_file, send to hgw, only when condition is true, else exit.Wellington Braga
02/24/2023, 2:37 PMprefect deployment ls --flow-name <flow-name>
?Vera Zabeida
02/24/2023, 5:13 PMvideo▾
name: Ad Hoc Deployment
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
on:
push:
branches:
- main
jobs:
ad-hoc-deployment:
name: Build and apply deployment
runs-on: ubuntu-latest
timeout-minutes: 45
steps:
- uses: actions/checkout@v3
with:
persist-credentials: false
fetch-depth: 0
- name: Set up Python 3.11.1
uses: actions/setup-python@v4
with:
python-version: "3.11.1"
cache: "pip"
cache-dependency-path: "requirements*.txt"
- name: Install packages
run: |
python -m pip install --upgrade pip
pip install --upgrade --upgrade-strategy eager -r requirements.txt
- name: Build and apply deployment
run: |
prefect deployment build ./flows/pipeline.py:hello \
--name "Ad Hoc Deployment" \
--params '{"user_input": "github action"}' \
--version $GITHUB_SHA \
--tag ad-hoc-gh \
--work-queue github \
--apply
In the video they set up a --storage-block
and use github as a storage block, and use that on the prefect deployment build command, but I'm wondering why that'd be needed. In the step above, we're checking out the repo so for me that would mean that we have all of the code already in this job, so no github storage is needed?
I've had one build that succeeded with this setup, but I didn't see any deployments in the UI, double checked my env vars, and since then getting this error message that isn't very helpful, here's the repo where you can see it:
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '***/flows/'
Response: {'detail': 'Not Found'}
I'm just trying to get a minimal setup going, and understand what's needed and not needed for that. TIA for any pointers!Vera Zabeida
02/24/2023, 5:13 PMvideo▾
name: Ad Hoc Deployment
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
PREFECT_API_URL: ${{ secrets.PREFECT_API_URL }}
on:
push:
branches:
- main
jobs:
ad-hoc-deployment:
name: Build and apply deployment
runs-on: ubuntu-latest
timeout-minutes: 45
steps:
- uses: actions/checkout@v3
with:
persist-credentials: false
fetch-depth: 0
- name: Set up Python 3.11.1
uses: actions/setup-python@v4
with:
python-version: "3.11.1"
cache: "pip"
cache-dependency-path: "requirements*.txt"
- name: Install packages
run: |
python -m pip install --upgrade pip
pip install --upgrade --upgrade-strategy eager -r requirements.txt
- name: Build and apply deployment
run: |
prefect deployment build ./flows/pipeline.py:hello \
--name "Ad Hoc Deployment" \
--params '{"user_input": "github action"}' \
--version $GITHUB_SHA \
--tag ad-hoc-gh \
--work-queue github \
--apply
In the video they set up a --storage-block
and use github as a storage block, and use that on the prefect deployment build command, but I'm wondering why that'd be needed. In the step above, we're checking out the repo so for me that would mean that we have all of the code already in this job, so no github storage is needed?
I've had one build that succeeded with this setup, but I didn't see any deployments in the UI, double checked my env vars, and since then getting this error message that isn't very helpful, here's the repo where you can see it:
prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url '***/flows/'
Response: {'detail': 'Not Found'}
I'm just trying to get a minimal setup going, and understand what's needed and not needed for that. TIA for any pointers!