Tobias
11/11/2022, 8:25 AMfrom prefect_aws import MinIOCredentials
3. I can also see that adding Secrets to Prefect Cloud is an option in 1.0, but this seems not to be possible in 2.0?Thomas Fredriksen
11/11/2022, 9:39 AM@asynccontextmanager
async def concurrency_limit(task: Task, limit: int = 10) -> Task:
context = get_run_context()
tag = f"{context.flow_run.id}-{task.name}"
client = get_client()
await client.create_concurrency_limit(tag, limit)
decorated_task = task.with_options(tags=[tag])
try:
yield decorated_task
finally:
await client.delete_concurrency_limit_by_tag(tag)
Manuel Garrido Peña
11/11/2022, 10:38 AMTibs
11/11/2022, 11:33 AMChristian Juhl
11/11/2022, 12:30 PMfrom prefect import task, flow, get_run_logger
@task
def square_number(number):
return number ** 2
@flow
def my_flow():
squared_numbers = []
for i in range(5):
result = square_number.submit(i)
squared_numbers.append(result.result())
return squared_numbers
if __name__ == '__main__':
output = my_flow()
print(output)
In this example, the next task is not submitted until the previous completes (they don't run concurrently), but if I move append(result.result()) outside the for loop, only the result from the last task is returned.
Thanks!Kelvin DeCosta
11/11/2022, 1:32 PMECSTask
block uses the boto3
ecs
client to call run_task
with some arguments that it derives based on the input attributes.
The agent log, if set with DEBUG
log level, shows the exact arguments that are passed to this function.
I figured that since the failure happens almost instantly after this message, it had to be this call that was causing the fatal error.
Fortunately, when I emulated the call via a python script, I ran into this error:
botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: Some tags contain invalid characters. Valid characters: UTF-8 letters, spaces, numbers and _ . / = + - : @.
So, if you have deployments that use the ECSTask
infrastructure block, make sure you only use the following characters in the flow and deployment names: UTF-8 letters, spaces, numbers and _ . / = + - : @
I'd rather not restrict flow / deployment names, so if there are workarounds to this problem, I'd really appreciate it.Oscar Björhn
11/11/2022, 1:48 PMpath: /opt/prefect/flows
entrypoint: orchestration/flows/test_curated.py:default
Jorge Luis Tudela Gonzalez de Riancho
11/11/2022, 2:25 PMAshley Felber
11/11/2022, 2:41 PMManuel Garrido Peña
11/11/2022, 3:09 PMStefano Cascavilla
11/11/2022, 3:42 PMprefect server start
command.
I'm using the 0.15.6 Prefect version and, suddendly, when I try to start the server, an error occurs when hasura tries to create the pgcrypto extension on PostgreSQL.
This is the error:
postgres_1 | 2022-11-11 15:40:49.418 UTC [84] ERROR: duplicate key value violates unique constraint "pg_extension_name_index"
postgres_1 | 2022-11-11 15:40:49.418 UTC [84] DETAIL: Key (extname)=(pgcrypto) already exists.
postgres_1 | 2022-11-11 15:40:49.418 UTC [84] STATEMENT: CREATE EXTENSION IF NOT EXISTS pgcrypto SCHEMA public
It's strange because I've used prefect until yesterday and I've never had this issue
Can anybody help me on this?Dan Wise
11/11/2022, 5:23 PMjack
11/11/2022, 8:51 PM--api
when starting the agent?Kishan
11/11/2022, 9:26 PMNo module named prefect.engine.__main__; 'prefect.engine' is a package and cannot be directly executed
. Anyone know what this means?
Also, for additional context, here are the storage and infrastructure blocks I set up.
gcs_block = GCS.load("gcs-block")
infra_block = KubernetesJob(
image="us-east1-docker.pkg.dev/{MY PROJECT}/prefect-agents/{MY IMAGE}",
env=dict(PREFECT_LOGGING_LEVEL="DEBUG"),
)
Tayyab Iqbal
11/12/2022, 3:02 AMprefect-server
on a centOS 7 Virtual Machine with following versions:
Python and pip libraries:
Python 3.9.10
prefect 1.4.0
prefect-server 2022.9.23+15.g356da2d
docker 6.0.1
docker-compose 1.29.2
Docker cli version: Docker version 20.10.21, build baeda1f
Docker Compose version: Docker Compose version v2.12.2
I am unable to run the prefect server ui under these environments, startup logs as result of prefect server start --expose
are attached
Following is the curl output even if I do from localhost:
[root@prefect-etl-vm-7 ~]# curl "<http://127.0.0.1:8080/>"
curl: (56) Recv failure: Connection reset by peer
Funny thing is I am running the same environment in a mac with just a docker desktop difference and it works on my local system.
Any kind of help is much appreciated.Lee Mendelowitz
11/12/2022, 2:28 PMcreate_flow_run_from_deployment
. Thanks!Russell Brooks
11/14/2022, 1:30 AMmerlin
11/14/2022, 5:02 AMprefect_sqlalchemy.database.sqlalchemy_execute
to send a SQL statement to a DB. But I can do that with with the sqlalchemy python library directly and write flows around that.
I must be missing the point?Rajeshwar Agrawal
11/14/2022, 5:37 AMRunning
and does not changes state ever
2. If the work queue is started again, the flow-runs which were supposedly still running do not continue to run and are stuck in Running
state.
3. The work queue does not accept any other pending flow-runs after restarting, and its status remains Unhealthy
The expected behaviour is that once the agent is terminated, the associated Running
flow-runs are marked as Cancelled
or a similar state. Further, once the agent associated with work queue comes back up, it should be able to pickup new flow-runs submitted with its label
Steps to reproduce
1. Start a local prefect orion server
2. Create a work queue test
with parallelism 1
3. Build and deploy the attached flow log_flow.py
4. Submit 20 jobs at once repeat 20 prefect deployment run 'log-flow/log-flow'
5. Terminate the work queue process (close the terminate which started the work queue)
Observe that work queue status changes to unhealthy
however 1 flow runs that started running still show Running
state. Further on restarting the queue, the interrupted flow-run does not resume and no new pending flow runs are submitted to the work queueEden
11/14/2022, 7:30 AMpy any_flow.py
successfully and insert data into BQ]
The problem I encounter is that failed to run a flow after the deployment.
Make it easier. I, then, follow the deployment, HERE_FROM_NATE, to create a very simple job.
After clicking running a flow for the deployment foo
, it shows error about connection issue.
07:17:34.363 | DEBUG | prefect.client - Connecting to API at │
│ <http://127.0.0.1:4200/api/> │
│ 07:17:35.873 | ERROR | prefect.engine - Engine execution of flow run │
│ 'd533fb3e-d96f-4774-ad53-bac47071f3c5' exited with unexpected exception
What I can think about is that the agent runs but it should not be <http://127.0.0.1:4200/api/>
for the agents to do the connection. But,
1. how could I set this? FQDN? public endpoint?
2. In where? Deployment?
Appreciate any help here, thx 🙇🏻♂️Steven Wilber
11/14/2022, 10:10 AMhttpx.HTTPStatusError: Client error '401 Unauthorized' for url '<http://localhost:8000/api/v1/health/>'
But I can check that url and it works fine and returns:
{"available":true}
Any help is much appreciated. Thanks.Stéphan Taljaard
11/14/2022, 1:28 PMMiremad Aghili
11/14/2022, 4:15 PMChristian Juhl
11/14/2022, 4:31 PMfrom prefect import task, flow
numbers = {
'a': 1,
'b': 2,
'c': 3,
'd': 4
}
@task
def square_number(number):
return number ** 2
@flow
def my_flow():
squared_numbers = square_number.with_options(tags=numbers.keys()).map(number=numbers.values())
return squared_numbers
if __name__ == '__main__':
output = my_flow()
Vishnu Duggirala
11/14/2022, 4:36 PMBlake Stefansen
11/14/2022, 5:22 PMflow_run
names are more than 63 characters
, our agent fails to create a deployment run using k8 jobs because it can't create a job label
Will post agent log in threadAmey Desai
11/14/2022, 6:09 PMalex
11/14/2022, 7:34 PMlogger = logging.getLogger("mylogs")
<http://logger.info|logger.info>("msg")
+
PREFECT__LOGGING__EXTRA_LOGGERS="['mylogs']"
Logs from my main thread are logged but not from any additional threads I am using.Madison Schott
11/14/2022, 9:51 PMSTORAGE = Docker(registry_url='<http://ecr.us-west-2.amazonaws.com/|ecr.us-west-2.amazonaws.com/>',
image_name='prefect-flows',
dockerfile='dbt_snowflake/DockerFile')
RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'prefect-prod'},
env={"PREFECT__LOGGING__LEVEL": "DEBUG"},
execution_role_arn='xx',
labels=['ecs-agent', 'prod', 'winc'])
Heather DeHaven
11/14/2022, 10:07 PM