Abhishek Singh
03/26/2024, 7:49 AM07:47:06.504 | ERROR | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(2, b'/tmp/prefec...us redirect\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(2, b'/tmp/prefec...us redirect\n')>
Traceback (most recent call last):
File "/usr/lib64/python3.11/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/usr/lib64/python3.11/asyncio/subprocess.py", line 72, in pipe_data_received
reader.feed_data(data)
File "/usr/lib64/python3.11/asyncio/streams.py", line 480, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
but if i try the same using run() it works fine
what am i doing wrong??
below is my trigger() logic
with ShellOperation(
commands=["mysqldump -h ${DB_HOST} -u ${DB_USER} -p ${DB_USER_PASSWORD} --databases --default-character-set=utf8 --events --routines --triggers --set-gtid-purged=OFF ${db} > ${backup_file_path}"],
env={"DB_HOST": DB_HOST,
"DB_USER":DB_USER,
"DB_USER_PASSWORD":DB_USER_PASSWORD,
"db":db,
"backup_file_path":backup_file_path}
) as shell_operation:
shell_process = shell_operation.trigger()
shell_process.wait_for_completion()
shell_output = shell_process.fetch_result()
Netanel Malka
03/26/2024, 8:44 AMprefect.yaml
:
pull:
- prefect.deployments.steps.set_working_directory:
directory: ~/company-data-engineering/prefect-data-engineering/src
deployments:
- name: iceberg-sanity
tags:
- maintenance
- de
entrypoint: flows/iceberg_sanity.py:iceberg_sanity
parameters:
p1: mountain
p2: true
p3:
- p4: 1
- p5: 2
schedule:
- cron: "0 0 * * *"
active: true
- name: data-purge
tags:
- maintenance
- de
parameters:
job_image: company-job-data-purge
job_resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 0.5
memory: 1Gi
job_envs:
RETENTION: 90 days
ALERTS_RETENTION: 90 days
entrypoint: flows/data_purge.py:data_purge
schedule:
- cron: "0 0 * * *"
active: true
I am running this command:
prefect --no-prompt deploy --all --pool default-kubernetes-pool --prefect-file prefect.yaml
But I get this output:
You have passed options to the deploy command, but you are creating or updating
multiple deployments. These options will be ignored.
Deploying all flows with an existing deployment configuration...
I have seen a reference in the docs that using deploy --all with multiple deployments :
https://docs.prefect.io/latest/guides/deployment/kubernetes/#define-a-prefect-deployment
Quoted from the docs:
To deploy your flows, ensure your Docker daemon is running first. Deploy all the flows with prefect deploy --all or deploy them individually by name: prefect deploy -n hello/default or prefect deploy -n hello/arthur.
It should be working? It that a bug?
Thanks.Mike Logaciuk
03/26/2024, 9:37 AMKarol Wolski
03/26/2024, 10:10 AMJiaqiang Li
03/26/2024, 3:09 PMif __name__ == "__main__":
everflow_clicks_conversions_api_to_gcp.serve(
name="everflow_deployment",
cron="0 10 * * *",
tags=["testing", "tutorial"],
description="Given a GitHub repository, logs repository statistics for that repo.",
version="tutorial/deployments",
)
my deploy code Everflow_deployment_test1.py is: if __name__ == "__main__":
from prefect import flow
from prefect_gitlab.repositories import GitLabRepository
from prefect_gitlab.credentials import GitLabCredentials
from prefect.blocks.system import Secret
if __name__ == "__main__":
flow.from_source(
source=GitLabRepository(
repository="<https://gitlab.com/excelimpact/data_eng/data_etl/prefect/repository.git>",
reference="main",
credentials=GitLabCredentials(token="glpat-******-*********")
),
entrypoint="workflow/everflow_workflow_test1.py:everflow_clicks_conversions_api_to_gcp",
).deploy(
name="Everflow-deployment-test1",
work_pool_name="my-managed-pool",
cron="0 10 * * *"
)
my gitlab-ci.yml
stages:
- deploy
deploy:
stage: deploy
image: python:3.10
before_script:
- export PREFECT_API_KEY=$PREFECT_API_KEY
#- echo $PREFECT_API_KEY
- pip install -r requirements.txt
#- |
# curl -s -H "Authorization: Bearer $PREFECT_API_KEY" "<https://api.prefect.cloud/api/me/>"
- prefect cloud login --key $PREFECT_API_KEY --workspace excelimpact/default
- prefect profile inspect
script:
- python Everflow_deployment_test1.py
# - python everflow_workflow_test1.py
only:
- main # Adjust branch as needed
tags:
- k8s-runner-excel-staging
the error is: File "/builds/excelimpact/data_eng/data_etl/prefect/prefect_gitlab/repositories.py", line 202, in get_directory
raise OSError(f"Failed to pull from remote:\n {err_stream.read()}")
OSError: Failed to pull from remote:
Cloning into '/tmp/tmpmn90f52sprefect'...
remote: You are not allowed to download code from this project.
fatal: unable to access '<https://gitlab.com/excelimpact/data_eng/data_etl/prefect.git/>': The requested URL returned error: 403
Avinash Santhanagopalan
03/26/2024, 4:22 PMRobin Niel
03/26/2024, 4:38 PMPaige Fuller
03/26/2024, 4:56 PMChris
03/26/2024, 6:04 PMadam aditama
03/27/2024, 3:47 AMFrost Ouyang
03/27/2024, 1:16 PMprefect.yaml
that allows changing the log format to JSON? Thanks!Kyle Austin
03/27/2024, 1:30 PMfrom prefect.blocks.system import JSON
JSON(value=[1,5,7]).save("fake-json-block", overwrite=True)
the deployment triggers. The problem we are having is that most of the time we are updating these blocks directly in the UI. When that happens I am not finding any corresponding event in the event log that I can use to trigger a deployment run. I would expect an event like prefect.block.json.updated in the event log (like I see when I update automations) but I dont see anything when I update directly in the UI. Is this an oversight/bug or is this intended behavior?May
03/27/2024, 1:34 PMvariables.get("xx", default=None)
i get <coroutine object get at 0x12ed8fe40>
? how can i get the actual content of the variable? (my variable is a dictionary: {y:1, v:2}). ThanksJiaqiang Li
03/27/2024, 3:24 PMif __name__ == "__main__":
everflow_clicks_conversions_api_to_gcp.serve(
name="everflow_deployment",
cron="0 10 * * *",
tags=["testing", "tutorial"],
description="Given a GitHub repository, logs repository statistics for that repo.",
version="tutorial/deployments",
)
my deploy code Everflow_deployment_test1.py is: if __name__ == "__main__":
from prefect import flow
from prefect_gitlab.repositories import GitLabRepository
from prefect_gitlab.credentials import GitLabCredentials
from prefect.blocks.system import Secret
if __name__ == "__main__":
flow.from_source(
source=GitLabRepository(
repository="<https://gitlab.com/excelimpact/data_eng/data_etl/prefect/repository.git>",
reference="main",
credentials=GitLabCredentials(token="glpat-******-*********")
),
entrypoint="workflow/everflow_workflow_test1.py:everflow_clicks_conversions_api_to_gcp",
).deploy(
name="Everflow-deployment-test1",
work_pool_name="my-managed-pool",
cron="0 10 * * *"
)
my gitlab-ci.yml
stages:
- deploy
deploy:
stage: deploy
image: python:3.10
before_script:
- export PREFECT_API_KEY=$PREFECT_API_KEY
#- echo $PREFECT_API_KEY
- pip install -r requirements.txt
#- |
# curl -s -H "Authorization: Bearer $PREFECT_API_KEY" "<https://api.prefect.cloud/api/me/>"
- prefect cloud login --key $PREFECT_API_KEY --workspace excelimpact/default
- prefect profile inspect
script:
- python Everflow_deployment_test1.py
# - python everflow_workflow_test1.py
only:
- main # Adjust branch as needed
tags:
- k8s-runner-excel-staging
the error is: File "/builds/excelimpact/data_eng/data_etl/prefect/prefect_gitlab/repositories.py", line 202, in get_directory
raise OSError(f"Failed to pull from remote:\n {err_stream.read()}")
OSError: Failed to pull from remote:
Cloning into '/tmp/tmpmn90f52sprefect'...
remote: You are not allowed to download code from this project.
fatal: unable to access '<https://gitlab.com/excelimpact/data_eng/data_etl/prefect.git/>': The requested URL returned error: 403
Robert Banick
03/27/2024, 4:18 PMFor use in a push work pool, [an AWS Credentials] block must have the region and cluster name filled out, in addition to access key and access key secret.
. However AWSCredentials objects in prefect_aws
provide no obvious place to specify the cluster, as seen in the docs. Am I missing something?Jiaqiang Li
03/27/2024, 4:37 PMZach Munro
03/27/2024, 5:28 PMimport os
from prefect import flow, task
from prefect_dbt import DbtCliProfile, DbtCoreOperation, SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
from prefect.context import FlowRunContext
@flow
def trigger_dbt_data_build(client: str, warehouse: str = "COMPUTE_WH"):
flow_run_name = FlowRunContext.get().flow_run.dict().get("name")
credentials = SnowflakeCredentials(
user=os.environ.get("SNOWFLAKE_USER"),
password=os.environ.get("SNOWFLAKE_PASSWORD"),
account=os.environ.get("SNOWFLAKE_ACCOUNT"),
role="ACCOUNTADMIN",
)
connector = SnowflakeConnector(
# schema=f"{client_name.upper()}_STG",
schema="dbt_zmunro",
threads=8,
database="RAW",
warehouse=warehouse,
credentials=credentials,
query_tag=f"dbt-data-build-{client}-{flow_run_name}",
)
target_configs = SnowflakeTargetConfigs(
connector=connector,
extras={
"retry_on_database_errors": True,
"connect_retries": 0,
"connect_timeout": 600,
"retry_all": False,
"reuse_connections": False,
},
)
dbt_cli_profile = DbtCliProfile(
name="prefect-snowflake-dev",
target="dev",
target_configs=target_configs,
)
return DbtCoreOperation(
commands=[
f'dbt build --select +somemodel --vars \'{{"client_schema":"{client.upper()}_STG"}}\''
],
dbt_cli_profile=dbt_cli_profile,
overwrite_profiles=True,
project_dir="/workflows/dbt/",
).run()
I am getting an error because the DbtCoreOperation
cant find my dbt_cli_profile
correctly. I am trying to follow the documentation here:
https://prefecthq.github.io/prefect-dbt/cli/credentials/#prefect_dbt.cli.credentials
That documentation doesn't say how to use this dbt_cli_profile
object with the DbtCoreOperation
function though. And I am confused as to how blocks relate to this all as well. Should I not be creating these credential objects each flow run and instead just do it once and save it in a block? where does this block get saved if I am not using prefect cloud?Eric Albanese
03/27/2024, 11:14 PMprefect-sqlalchemy
to retrieve column heads of a table? Typically the execute
command returns a properties meta that you can pull that from, but it kind of seems like the current implementation doesn't return anything when usingBerislav Lopac
03/28/2024, 12:12 AM00:09:42.549 | ERROR | Flow run 'humongous-squid' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/berislav/.virtualenvs/dataflows/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
yield
File "/home/berislav/.virtualenvs/dataflows/lib/python3.11/site-packages/httpx/_transports/default.py", line 373, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/berislav/.virtualenvs/dataflows/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 216, in handle_async_request
raise exc from None
File "/home/berislav/.virtualenvs/dataflows/lib/python3.11/site-packages/httpcore/_async/connection_pool.py", line 196, in handle_async_request
response = await connection.handle_async_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/berislav/.virtualenvs/dataflows/lib/python3.11/site-packages/httpcore/_async/connection.py", line 101, in handle_async_request
return await self._connection.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/berislav/.virtualenvs/dataflows/lib/python3.11/site-packages/httpcore/_async/http2.py", line 183, in handle_async_request
raise LocalProtocolError(exc) # pragma: nocover
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
httpcore.LocalProtocolError: 1
Any ideas?Jay
03/28/2024, 3:03 AMfrom prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
from prefect.blocks.system import Secret
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.POSTGRESQL_PSYCOPG2,
username="prefect",
password=Secret.load("tradesys-db-password").get(),
host="<http://xxx.xxx.xxx.xxx|xxx.xxx.xxx.xxx>",
port=5432,
database="prefect",
)
)
connector.save("tradesys-db-connector")
and saving my connector keeps getting a 403 error. I am self hosted with prefect running in a docker on my server, and I am running this script from my laptop. Secrect.load() seems to be working, and I have been using my server with other scripts, so seems to just be an issue with this
prefect.exceptions.PrefectHTTPStatusError: Client error '403 Forbidden' for url '<http://xxx.xxx.xxx.xxx:4200/api/block_types/>'
Response: {'detail': 'Missing CSRF token.'}
May
03/28/2024, 10:12 AMOfek K
03/28/2024, 11:03 AMLior Barak
03/28/2024, 12:08 PMJessica Smith
03/28/2024, 12:45 PMGerti
03/28/2024, 2:21 PMflow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(state={"type": {"any_": [StateType.RUNNING]}}),
name_filter=FlowRunFilterName(id={"like_": "product"})
)
I am having an issue as this code doesnt workJoe Uhm
03/28/2024, 3:55 PMJiaqiang Li
03/28/2024, 6:05 PMModuleNotFoundError
caused by missing module 'gitlab' in the deployment environment. However, I definately installed python-gitlab in the virtue environment.Charles Leung
03/28/2024, 10:31 PMSamuel Hinton
03/29/2024, 8:23 AMquote
annotation. This doesnt seem to work for us, as we're using a scheduler and so we get Futures instead of objects, and wrapping quote around a future and then trying to access the data in the future (the same way without the quote) by using task_future.data
raises:
AttributeError: 'PrefectFuture' object has no attribute 'data'
Does anyone know how to combine quote
and futures where you launch tasks via submit
?Samuel Hinton
03/29/2024, 8:40 AMMissing CRF token
...
Is this a new feature or something I need to read up on. Ah, guess it from https://github.com/PrefectHQ/prefect/pull/12377