Guillaume Bertrand
01/25/2023, 4:48 PMArturo Martínez Pacheco
01/27/2023, 12:04 AMNimesh Kumar
01/30/2023, 10:02 AMNimesh Kumar
01/31/2023, 12:03 PMJezreel Zamora
02/01/2023, 6:12 AMDocker(files={"additional_file.py": "additional_file.py})
Question:
• How we can set additional files that our flows needs using S3 storage with ECSRun configuration?
• If its not possible to set the additional files, is there any recommended workaround ?Jinnzy
02/01/2023, 9:18 AMflow = Deployment.build_from_flow(
flow=fusionist_flow,
name=f"fusionist-{i}",
schedule=(IntervalSchedule(
interval=timedelta(hours=24).total_seconds(),
timezone="Asia/Shanghai",
anchor_date=pendulum.now().add(seconds=15),
)),
parameters={"account_index": i},
work_queue_name="browser",
ignore_file=".prefectignore"
)
flow.apply()
cat .prefectignore
venv
web
Slackbot
02/01/2023, 12:53 PMScott Chamberlain
02/01/2023, 8:10 PMprefect-shell
to send commands to spin up docker containers. We do get some logs from within the container processes but not all of them - then we have to find the container or process on our VM and look at the logs there - it’d be great to have all logs show up in the prefect dashboard. It’s not clear to me why we get some logs and not others from our docker containers. Any guidance would be appreciated.Guillaume Bertrand
02/02/2023, 9:40 AM@flow
def my_flow():
a = subflow1(...)
b = subflow2(...)
c = subflow3(a)
Would it be possible to create a graph from this and raise an error because subflow2 is a dangling node ?Nimesh Kumar
02/02/2023, 11:45 AMfrom prefect.client import get_client
from datetime import datetime
import asyncio
import random
import uuid
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print("Current Time =", current_time)
async def trigger_flow_runs():
client = get_client()
print(client, ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
d_id_list = ["ddf0ae5c-7ed4-48b2-b417-535d767933f2"]
choosen_queue = random.choice(d_id_list)
print("Deployment ID ", choosen_queue)
param = {"my_param": str(uuid.uuid4()), "file_path": "root/flows/tmp_folder"}
print(param)
t = await client.create_flow_run_from_deployment(deployment_id=choosen_queue, parameters=param, db: session, name=str(uuid.uuid4))
print(".....................................................................................................................................................................................")
print(t)
print(".....................................................................................................................................................................................")
if __name__ == "__main__":
asyncio.run(trigger_flow_runs())
but it is giving me this below error, i don't know why it is trying sqllite3
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: deployment
[SQL: SELECT deployment.id AS deployment_id, deployment.created AS deployment_created, deployment.updated AS deployment_updated, deployment.name AS deployment_name, deployment.version AS deployment_version, deployment.description AS deployment_description, deployment.manifest_path AS deployment_manifest_path, deployment.work_queue_name AS deployment_work_queue_name, deployment.infra_overrides AS deployment_infra_overrides, deployment.path AS deployment_path, deployment.entrypoint AS deployment_entrypoint, deployment.schedule AS deployment_schedule, deployment.is_schedule_active AS deployment_is_schedule_active, deployment.tags AS deployment_tags, deployment.parameters AS deployment_parameters, deployment.parameter_openapi_schema AS deployment_parameter_openapi_schema, deployment.created_by AS deployment_created_by, deployment.updated_by AS deployment_updated_by, deployment.flow_id AS deployment_flow_id, deployment.work_pool_queue_id AS deployment_work_pool_queue_id, deployment.infrastructure_document_id AS deployment_infrastructure_document_id, deployment.storage_document_id AS deployment_storage_document_id, work_pool_1.id AS work_pool_1_id, work_pool_1.created AS work_pool_1_created, work_pool_1.updated AS work_pool_1_updated, work_pool_1.name AS work_pool_1_name, work_pool_1.description AS work_pool_1_description, work_pool_1.type AS work_pool_1_type, work_pool_1.base_job_template AS work_pool_1_base_job_template, work_pool_1.is_paused AS work_pool_1_is_paused, work_pool_1.default_queue_id AS work_pool_1_default_queue_id, work_pool_1.concurrency_limit AS work_pool_1_concurrency_limit, work_pool_queue_1.id AS work_pool_queue_1_id, work_pool_queue_1.created AS work_pool_queue_1_created, work_pool_queue_1.updated AS work_pool_queue_1_updated, work_pool_queue_1.name AS work_pool_queue_1_name, work_pool_queue_1.description AS work_pool_queue_1_description, work_pool_queue_1.is_paused AS work_pool_queue_1_is_paused, work_pool_queue_1.concurrency_limit AS work_pool_queue_1_concurrency_limit, work_pool_queue_1.priority AS work_pool_queue_1_priority, work_pool_queue_1.work_pool_id AS work_pool_queue_1_work_pool_id
FROM deployment LEFT OUTER JOIN work_pool_queue AS work_pool_queue_1 ON work_pool_queue_1.id = deployment.work_pool_queue_id LEFT OUTER JOIN work_pool AS work_pool_1 ON work_pool_1.id = work_pool_queue_1.work_pool_id
WHERE deployment.id = :pk_1]
[parameters: {'pk_1': 'ddf0ae5c-7ed4-48b2-b417-535d767933f2'}]
(Background on this error at: <https://sqlalche.me/e/14/e3q8>)
I an using this below docker-compose file
version: "3.3"
services:
### Prefect Database
database:
image: postgres:15.1-alpine
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=orion
expose:
- 5432
volumes:
- db:/var/lib/postgresql/data
### MinIO for flow storage
minio:
image: minio/minio:latest
entrypoint: ["minio", "server", "--address", "0.0.0.0:9000", "--console-address", "0.0.0.0:9002", "/data"]
volumes:
- "minio:/data"
ports:
- 9000:9000
- 9002:9002
### Prefect Orion API
orion:
image: prefecthq/prefect:2.7.7-python3.11
restart: always
volumes:
- prefect:/root/.prefect
entrypoint: ["prefect", "orion", "start"]
working_dir: "/root/flows"
volumes:
- "./flows:/root/flows"
environment:
- PREFECT_API_URL=<http://172.16.12.17:4200/api>
- PREFECT_ORION_API_HOST=0.0.0.0
- PREFECT_ORION_DATABASE_CONNECTION_URL=<postgresql+asyncpg://postgres:postgres@database:5432/orion>
ports:
- 4200:4200
depends_on:
- database
## Prefect Agent
agent:
image: prefecthq/prefect:2.7.7-python3.11
restart: always
entrypoint: ["prefect", "agent", "start", "-q", "rbfracture", "testing"]
environment:
- PREFECT_API_URL=<http://172.16.12.17:4200/api>
# ### Prefect CLI
# cli:
# image: prefecthq/prefect:2.7.7-python3.11
# entrypoint: "bash"
# working_dir: "/root/flows"
# volumes:
# - "./flows:/root/flows"
# environment:
# - PREFECT_API_URL=<http://orion:4200/api>
volumes:
prefect:
db:
minio:
networks:
default:
external:
name: carpl_docker_backend
Can anyone please tell me how i can trigger itArturo Martínez Pacheco
02/03/2023, 2:59 PMShivan Trivedi
02/03/2023, 3:32 PMMwanthi Daniel
02/04/2023, 1:05 AMKeyError: 'credentials'
nicholasnet
02/04/2023, 6:43 PMBroder Peters
02/06/2023, 11:38 AMprefect deployment build my_flow.py:do_parallel_stuff -n docker -ib docker-container/docker -sb s3/some-bucket
@flow(task_runner=DaskTaskRunner)
def do_parallel_stuff():
for x in y:
result = do_heavy_stuff_in_own_environment.submit(x)
@task()
def do_heavy_stuff_in_own_environment(x: str):
# Does heavy stuff
Thanks in advance!Scott Chamberlain
02/06/2023, 2:54 PMNimesh Kumar
02/07/2023, 11:35 AM11:31:19.076 | INFO | prefect.agent - Submitting flow run 'f8b44688-71ae-464f-ba5a-2220ee44f475'
11:31:19.105 | INFO | prefect.infrastructure.process - Opening process '<function uuid4 at 0x7fd67974b560>'...
11:31:19.110 | INFO | prefect.agent - Completed submission of flow run 'f8b44688-71ae-464f-ba5a-2220ee44f475'
<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
11:31:20.431 | INFO | Flow run '<function uuid4 at 0x7fd67974b560>' - Downloading flow code from storage at ''
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 262, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 247, in asyncnullcontext
yield
File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/deployments.py", line 170, in load_flow_from_flow_run
await storage_block.get_directory(from_path=deployment.path, local_path=".")
File "/usr/local/lib/python3.11/site-packages/prefect/filesystems.py", line 468, in get_directory
return await self.filesystem.get_directory(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/filesystems.py", line 313, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
error from daemon in stream: Error grabbing logs: unexpected EOF
Can anyone please helpJames Gatter
02/07/2023, 9:28 PMRohit Motiani
02/09/2023, 1:58 AMJason
02/09/2023, 2:05 PM(zoom) $ prefect deployment build etl_web_to_gcs.py:etl_web_to_gcs -q gh -n "gh-web2gch-2" -sb github/ghs-webtogcs -o gh-storage-etl-deployment.yaml --apply
I know it is pulling from local repo and not from github because among other reasons, I accidentally left an error in my local etl script that isn't in my github repo etl script and the error is showing up on the deployment.
This is from the logs:
Flow run 'gamma2-ixkun-c' - Downloading flow code from storage at ''
Austin Weisgrau
02/09/2023, 6:18 PMAustin Weisgrau
02/09/2023, 7:34 PMFROM prefecthq/prefect:2-python3.10
Here is the error I get when I try and run a flow using that docker image:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 268, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 167, in load_flow_from_flow_run
storage_block = Block._from_block_document(storage_document)
File "/usr/local/lib/python3.10/site-packages/prefect/blocks/core.py", line 571, in _from_block_document
else cls.get_block_class_from_schema(block_document.block_schema)
File "/usr/local/lib/python3.10/site-packages/prefect/blocks/core.py", line 591, in get_block_class_from_schema
return lookup_type(cls, block_schema_to_key(schema))
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/dispatch.py", line 186, in lookup_type
raise KeyError(
KeyError: "No class found for dispatch key 's3-bucket' in registry for type 'Block'."
I'm not sure whether this is a misconfiguration of the flow deployment, the S3 block, or the Docker container/registry blocks, or something else.Matt L
02/09/2023, 9:32 PMAustin Weisgrau
02/09/2023, 10:51 PMScott Chamberlain
02/10/2023, 5:14 PMprefect-shell
(we’re currently moving from using shell_run_command
to ShellOperation
). These shell commands all spin up docker containers. Sometimes a container will hang without any logs for up to 24 hrs. The only thing we’ve found to work is “tickling” the process connected to the container (we’ve done e..g, tail /proc/<pid>/fd/2
), and then we get logs again. We haven’t seen this problem when we start docker containers outside of a prefect context, so we’re thinking it’s something to do with prefect. it is possible this will go away as we transition to ShellOperation
, but perhaps not. Curious if anyone has seen this behavior too? (related thread https://prefect-community.slack.com/archives/C048ZHT5U3U/p1675282219967509 )Jaime Raldua Veuthey
02/10/2023, 5:45 PMMathuraju Sivasankar
02/11/2023, 8:37 AMMathuraju Sivasankar
02/11/2023, 2:59 PMJuan David Lozano
02/11/2023, 8:21 PMcd prefect && python3 -m pip install .
I get an error that prefect needs python 3.7, I try to upgrade to python 3.7 but I think (the repo was made with a lower version of python? I could be wrong just starting to understand docker) is there a more updated version of this article? or somebody can point me on how to install prefect on google cloud?Hicham Benbriqa
02/11/2023, 9:20 PMprefect version
:
Version: 2.3.1
API version: 0.8.0
Python version: 3.9.7
Git commit: 1d485b1d
Built: Thu, Sep 1, 2022 3:53 PM
OS/Arch: linux/x86_64
Profile: default
Server type: <client error>
Hicham Benbriqa
02/11/2023, 9:20 PMprefect version
:
Version: 2.3.1
API version: 0.8.0
Python version: 3.9.7
Git commit: 1d485b1d
Built: Thu, Sep 1, 2022 3:53 PM
OS/Arch: linux/x86_64
Profile: default
Server type: <client error>