Andy Dienes
04/13/2023, 6:46 PMdammy arinde
04/13/2023, 6:53 PMDeceivious
04/13/2023, 7:44 PMAdam
04/13/2023, 8:56 PMMaikel Penz
04/13/2023, 11:35 PM| _ \ _ \ __| __| __/ __|_ _| /_\ / __| __| \| |_ _|
| _/ / _|| _|| _| (__ | | / _ \ (_ | _|| .` | | |
|_| |_|_\___|_| |___\___| |_| /_/ \_\___|___|_|\_| |_|
Agent started! Looking for work from queue(s): infra-dev-plexflow-2...
23:25:42.455 | INFO | prefect.agent - Submitting flow run '69af3c9b-f22b-42db-a0a5-21595ec5408a'
23:25:44.631 | INFO | prefect.infrastructure.kubernetes-job - Job 'flow-infra-data-engineering-infra-dev-96hss': Pod has status 'Pending'.
23:25:44.705 | INFO | prefect.agent - Completed submission of flow run '69af3c9b-f22b-42db-a0a5-21595ec5408a'
23:26:44.628 | ERROR | prefect.infrastructure.kubernetes-job - Job 'flow-infra-data-engineering-infra-dev-96hss': Pod never started.
23:26:44.792 | INFO | prefect.agent - Reported flow run '69af3c9b-f22b-42db-a0a5-21595ec5408a' as crashed: Flow run infrastructure exited with non-zero status code -1.
.. and these are the logs from the job pod..
kubectl logs flow-infra-data-engineering-infra-dev-jhqhm-74k5l --follow
/usr/local/lib/python3.8/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
warn(RuntimeWarning(msg))
23:31:09.701 | INFO | Flow run 'economic-jerboa' - Downloading flow code from storage at ''
23:36:17.233 | WARNING | aiobotocore.credentials - Refreshing temporary credentials failed during mandatory refresh period.
Traceback (most recent call last):
...
..
..
raise ConnectTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "<https://sts.eu-west-1.amazonaws.com/>"
.. it gets stuck trying to download the code. The role I have assigned to the cluster has S3 full access.
My S3 Block has this configuration:
Bucket Path: <bucket-name>
AWS Access Key Id: None
AWS Secret Access Key: None
There seem to be multiple problems but..
1. Why does the flow fail right after it starts with Pod never started.
?
2. And why it cannot pull the code from S3 and the log shows Downloading flow code from storage at ''
? which is emptyChoenden Kyirong
04/14/2023, 5:18 AM@task
def task(x: str, y: int):
...
@flow
def flow():
list_1, list_2 = ['a', 'b'], [1, 2]
for input_1, input_2 in zip(list_1, list_2):
task(input_1, input_2)
Would this be done via:
task.map(list_1 , list_2)
Or will prefect only iterate only one of the parameters?ale
04/14/2023, 7:25 AMclient.cancel_flow_run
, it seems the ECS task of the flow run is not stopped and stays in the Running state indefinitely.
Has anyone experienced such behaviour?
Is this expected or not?
Thanks!davzucky
04/14/2023, 7:41 AMDavid Salgado
04/14/2023, 8:08 AMEmma Rizzi
04/14/2023, 8:19 AMpush:True
to a private registry ?
Right now, my flow is built and push, the flow.py
is inside the docker image, and I tried to configure the pull step with prefect.projects.steps.set_working_directory
but I get
FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flow'
It seems that the pull step is executed outside of the docker container ? my code in threadTanishq Hooda
04/14/2023, 10:02 AMskip_upload=True
here which we set because our s3 bucket policy requires to have encryption set while uploading to s3, so we copy the flow code using aws s3 cp cmd and we set -sse AES256 parameter there. I'm wondering is this something that can be supported in prefect in future, or if it is already supported what is the parameter I need to add? Thanks!
deployment = Deployment.build_from_flow(
flow=flow,
name=FLOW_NAME,
parameters={},
infra_overrides={
"image": SHARED_CONFIG.value["prefect_image_name"],
"service_account_name": "sa",
"image_pull_policy": "Always",
"namespace": "nspc",
},
infrastructure={"block_type_slug": "kubernetes-job"},
work_queue_name="k8s",
storage=storage,
path=FLOW_NAME,
skip_upload=True,
)
Scott Cressi
04/14/2023, 2:14 PMS
04/14/2023, 2:45 PMprefecthq/prefect:2-python3.8
, the python version is ENV PYTHON_VERSION=3.8.16
-- unfortunately, 3.8.16 breaks my code. I need to run 3.8.0 through the docker image. Does anyone have thoughts on how I can get the base image set back to 3.8.0?
Thx much!Jens
04/14/2023, 3:34 PMasync def deploy_ki_pipeline_controller():
logger = get_run_logger()
async with get_client() as client:
flow_id: UUID = await client.create_flow(infere_pipeline_handler_flow)
<http://logger.info|logger.info>(f"Der Flow infere_pipeline_handler_flow wurde erfolgreich mit der UUID {flow_id} erstellt")
params = {
"watch_folder_path" : "test",
"dead_letter_path": 'test/dead_letter',
"on_success_move_path": 'test/bearbeitet',
"lego_api_url": 'test/annehmen',
"lego_api_user": 'test',
"log2File": False,
"logging_file_path": 'test',
"dry_run_api": False,
"dry_run_clean": False,
"delete_on_success": True,
"create_images": False,
"extended_logs": False,
"extended_anonym_error_logs": True
}
deployment_id: UUID = await client.create_deployment(
flow_id=flow_id,
name="infere_pipeline_handler_deployment",
storage_document_id="5bf39961-0f81-4a4b-9751-2d7e132066a8",
parameters=params,
work_pool_name="first-worker-pool",
work_queue_name="test-queue-master",
parameter_openapi_schema=params,
entrypoint="prefect_module/flows/infere_pipeline_handler_flow.py:infere_pipeline_handler_flow",
)
All works fine, the flow and the deployment are created and I can access it via the prefect UI. But the parameters for the deployment are empty
Is there a step I'm missing or I do something wrong here?Jiri Klein
04/14/2023, 3:35 PMprefect deployment inspect flow_name/deployment_name
for my deployment, I can see the parameters in the returned JSON.
Additionally, this means that I can’t override the Deployment parameters for ad-hoc runs of my flows.
Is this an expected behaviour in any of the Prefect Cloud UIs?
question also posted in #prefect-uiAndy Dienes
04/14/2023, 5:04 PMprefect deploy
? prefect deployment build
has the -ib
flag but prefect deploy
does notJosh Paulin
04/14/2023, 5:17 PMGabriel Santos
04/14/2023, 7:21 PMState message: Flow could not be retrieved from deployment. subprocess.CalledProcessError: Command '['git', 'clone', 'git@github.com:Oicoteam/prefect.git', '-b', 'main', '--depth', '1']' returned non-zero exit status 128.
Leela Surya Teja Mangamuri
04/14/2023, 7:25 PMAbhinav Chordia
04/14/2023, 7:43 PMFederico Zambelli
04/14/2023, 7:55 PMTask inputs
works. I followed this example here and I still don't get it.
Can someone ELI5 it to me?
I don't get how certain tasks show inputs, but others dont (see screens)Adam
04/14/2023, 8:56 PMpull:
- prefect.projects.steps.git_clone_project:
repository: <https://github.com/comp_rep/bi_etl.git>
branch: DATA-1425-docker-project-deploy
access_token: "{{ prefect.blocks.github-credentials.git-creds }}"
Abhinav Chordia
04/14/2023, 9:29 PMThe prefect-agent’s are using the same URL and have come up correctly.Copy codeprefect.exceptions.PrefectHTTPStatusError: Client error '405 Method Not Allowed' for url '<http://172.28.36.138:4200/api/work_pools/>'
Leon Kozlowski
04/15/2023, 3:45 AMtrigger_dbt_cli_command
pre-built task?David Flores
04/15/2023, 7:00 AMfrom prefect import flow, task, get_run_logger
from minio import Minio
@task
def bucket_creation(bucket_name: str = "name-entries"):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Creating bucket name: [bold red]{bucket_name} ==> MINIO_ENDPOINT: {os.getenv('MINIO_ENDPOINT')}[/]")
client = Minio(
...
The above <http://logger.info|logger.info>
is using Rich’s markup and these are the output I have on the Prefect Server flow run and at the agent leveldarshan darshan
04/15/2023, 9:56 AMFederico Zambelli
04/15/2023, 2:13 PM@task
, I get the following error:
ValueError: signal only works in main thread of the main interpreter
This is, from what I understand, due to the fact that pmaw is configured to listen to a SIGKILL from the CLI, and gracefully interrupt the process, which I guess can't happen if it's running in a separate thread.
The moment I remove @task
decorator from that function, then I can execute it normally inside a flow, but I obviously lose all the advantages of tasks such as caching and whatnot.
For the record, here's the function I'm talking about:
def get_submission_ids(start_date: str, end_date: str, subreddit: str):
reddit = get_reddit_client() # Reddit client from `praw`
api_praw = PushshiftAPI(praw=reddit)
start_datetime = datetime.strptime(start_date, "%Y-%m-%d")
start_date_ts = int(start_datetime.timestamp())
end_date_ts = int(datetime.strptime(end_date, "%Y-%m-%d").timestamp())
search_window_days = (datetime.today() - start_datetime).days
submissions = await api_praw.search_submissions(
subreddit=subreddit,
after=start_date_ts,
until=end_date_ts,
search_window=search_window_days,
)
return [f'{SUBMISSION}_{sub["id"]}' for sub in submissions]
kent
04/17/2023, 6:04 AMAbhishek Mitra
04/17/2023, 10:19 AMLambdaInvoke
in prefect v1.