Adding io_uring for batching PROCS_PRIO cmd

Integrate io_uring within LMKD to batch the read, and write, system
calls needed to process, and register, processes and adjust their OOM
scores.

Test: atest lmkd_tests
Bug: 325525024
Change-Id: I339be2b6f569189519e0e11d07cd6d7d1cf2566d
Signed-off-by: Carlos Galo <carlosgalo@google.com>
This commit is contained in:
Carlos Galo 2024-05-09 10:46:49 +00:00
parent 2f00c03379
commit af79337d51
2 changed files with 187 additions and 2 deletions

View File

@ -54,7 +54,9 @@ cc_binary {
static_libs: [
"libstatslogc",
"liblmkd_utils",
"liburing",
],
include_dirs: ["bionic/libc/kernel"],
header_libs: [
"bpf_headers",
],

187
lmkd.cpp
View File

@ -40,10 +40,12 @@
#include <shared_mutex>
#include <vector>
#include <bpf/KernelUtils.h>
#include <bpf/WaitForProgsLoaded.h>
#include <cutils/properties.h>
#include <cutils/sockets.h>
#include <liblmkd_utils.h>
#include <liburing.h>
#include <lmkd.h>
#include <lmkd_hooks.h>
#include <log/log.h>
@ -206,6 +208,11 @@ static std::unique_ptr<android::bpf::memevents::MemEventListener> memevent_liste
static struct timespec direct_reclaim_start_tm;
static struct timespec kswapd_start_tm;
/* io_uring for LMK_PROCS_PRIO */
static struct io_uring lmk_io_uring_ring;
/* IO_URING_OP_READ/WRITE opcodes were introduced only on 5.6 kernel */
static const bool isIoUringSupported = android::bpf::isAtLeastKernelVersion(5, 6, 0);
static int level_oomadj[VMPRESS_LEVEL_COUNT];
static int mpevfd[VMPRESS_LEVEL_COUNT] = { -1, -1, -1 };
static bool pidfd_supported;
@ -1479,6 +1486,180 @@ static void cmd_target(int ntargets, LMKD_CTRL_PACKET packet) {
}
}
static void handle_io_uring_procs_prio(const struct lmk_procs_prio& params, const int procs_count,
struct ucred* cred) {
struct io_uring_sqe* sqe;
struct io_uring_cqe* cqe;
int fds[PROCS_PRIO_MAX_RECORD_COUNT];
char buffers[PROCS_PRIO_MAX_RECORD_COUNT]
[256]; /* Reading proc/stat and write to proc/oom_score_adj */
char path[PROCFS_PATH_MAX];
char val[20];
int64_t tgid;
int ret;
int num_requests = 0;
ret = io_uring_queue_init(PROCS_PRIO_MAX_RECORD_COUNT, &lmk_io_uring_ring, 0);
if (ret) {
ALOGE("LMK_PROCS_PRIO failed to setup io_uring ring: %s", strerror(-ret));
return;
}
std::fill_n(fds, PROCS_PRIO_MAX_RECORD_COUNT, -1);
for (int i = 0; i < procs_count; i++) {
if (params.procs[i].oomadj < OOM_SCORE_ADJ_MIN ||
params.procs[i].oomadj > OOM_SCORE_ADJ_MAX)
ALOGW("Skipping invalid PROCS_PRIO oomadj=%d for pid=%d", params.procs[i].oomadj,
params.procs[i].pid);
else if (params.procs[i].ptype < PROC_TYPE_FIRST ||
params.procs[i].ptype >= PROC_TYPE_COUNT)
ALOGW("Skipping invalid PROCS_PRIO pid=%d for invalid process type arg %d",
params.procs[i].pid, params.procs[i].ptype);
else {
snprintf(path, PROCFS_PATH_MAX, "/proc/%d/status", params.procs[i].pid);
fds[i] = open(path, O_RDONLY | O_CLOEXEC);
if (fds[i] < 0) continue;
sqe = io_uring_get_sqe(&lmk_io_uring_ring);
if (!sqe) {
ALOGE("LMK_PROCS_PRIO skipping pid (%d), failed to get SQE for read proc status",
params.procs[i].pid);
close(fds[i]);
fds[i] = -1;
continue;
}
io_uring_prep_read(sqe, fds[i], &buffers[i], sizeof(buffers[i]), 0);
sqe->user_data = i;
num_requests++;
}
}
if (num_requests == 0) {
ALOGW("LMK_PROCS_PRIO has no read proc status requests to process");
goto err;
}
ret = io_uring_submit(&lmk_io_uring_ring);
if (ret <= 0 || ret != num_requests) {
ALOGE("Error submitting read processes' status to SQE: %s", strerror(ret));
goto err;
}
for (int i = 0; i < num_requests; i++) {
ret = TEMP_FAILURE_RETRY(io_uring_wait_cqe(&lmk_io_uring_ring, &cqe));
if (ret < 0 || !cqe) {
ALOGE("Failed to get CQE, in LMK_PROCS_PRIO, for read batching: %s", strerror(-ret));
goto err;
}
if (cqe->res < 0) {
ALOGE("Error in LMK_PROCS_PRIO for async proc status read operation: %s",
strerror(-cqe->res));
continue;
}
if (cqe->user_data < 0 || static_cast<int>(cqe->user_data) > procs_count) {
ALOGE("Invalid LMK_PROCS_PRIO CQE read data: %llu", cqe->user_data);
continue;
}
const int procs_idx = cqe->user_data;
close(fds[procs_idx]);
fds[procs_idx] = -1;
io_uring_cqe_seen(&lmk_io_uring_ring, cqe);
if (parse_status_tag(buffers[procs_idx], PROC_STATUS_TGID_FIELD, &tgid) &&
tgid != params.procs[procs_idx].pid) {
ALOGE("Attempt to register a task that is not a thread group leader "
"(tid %d, tgid %" PRId64 ")",
params.procs[procs_idx].pid, tgid);
continue;
}
/* Open write file to prepare for write batch */
snprintf(path, sizeof(path), "/proc/%d/oom_score_adj", params.procs[procs_idx].pid);
fds[procs_idx] = open(path, O_WRONLY | O_CLOEXEC);
if (fds[procs_idx] < 0) {
ALOGW("Failed to open %s; errno=%d: process %d might have been killed, skipping for "
"LMK_PROCS_PRIO",
path, errno, params.procs[procs_idx].pid);
continue;
}
}
/* Prepare to write the new OOM score */
num_requests = 0;
for (int i = 0; i < procs_count; i++) {
if (fds[i] < 0) continue;
/* gid containing AID_READPROC required */
/* CAP_SYS_RESOURCE required */
/* CAP_DAC_OVERRIDE required */
snprintf(buffers[i], sizeof(buffers[i]), "%d", params.procs[i].oomadj);
sqe = io_uring_get_sqe(&lmk_io_uring_ring);
if (!sqe) {
ALOGE("LMK_PROCS_PRIO skipping pid (%d), failed to get SQE for write",
params.procs[i].pid);
close(fds[i]);
fds[i] = -1;
continue;
}
io_uring_prep_write(sqe, fds[i], &buffers[i], sizeof(buffers[i]), 0);
sqe->user_data = i;
num_requests++;
}
if (num_requests == 0) {
ALOGW("LMK_PROCS_PRIO has no write proc oomadj requests to process");
goto err;
}
ret = io_uring_submit(&lmk_io_uring_ring);
if (ret <= 0 || ret != num_requests) {
ALOGE("Error submitting write data to sqe: %s", strerror(ret));
goto err;
}
/* Handle async write completions for proc/<pid>/oom_score_adj */
for (int i = 0; i < num_requests; i++) {
ret = TEMP_FAILURE_RETRY(io_uring_wait_cqe(&lmk_io_uring_ring, &cqe));
if (ret < 0 || !cqe) {
ALOGE("Failed to get CQE, in LMK_PROCS_PRIO, for write batching: %s", strerror(-ret));
goto err;
}
if (cqe->res < 0) {
ALOGE("Error in LMK_PROCS_PRIO for async proc status read operation: %s",
strerror(-cqe->res));
continue;
}
if (cqe->user_data < 0 || static_cast<int>(cqe->user_data) > procs_count) {
ALOGE("Invalid LMK_PROCS_PRIO CQE read data: %llu", cqe->user_data);
continue;
}
const int procs_idx = cqe->user_data;
close(fds[procs_idx]);
fds[procs_idx] = -1;
io_uring_cqe_seen(&lmk_io_uring_ring, cqe);
if (use_inkernel_interface) {
stats_store_taskname(params.procs[procs_idx].pid,
proc_get_name(params.procs[procs_idx].pid, path, sizeof(path)));
continue;
}
register_oom_adj_proc(params.procs[procs_idx], cred);
}
io_uring_queue_exit(&lmk_io_uring_ring);
return;
err:
for (int fd : fds)
if (fd >= 0) close(fd);
io_uring_queue_exit(&lmk_io_uring_ring);
return;
}
static void cmd_procs_prio(LMKD_CTRL_PACKET packet, const int field_count, struct ucred* cred) {
struct lmk_procs_prio params;
@ -1488,8 +1669,10 @@ static void cmd_procs_prio(LMKD_CTRL_PACKET packet, const int field_count, struc
return;
}
for (int i = 0; i < procs_count; i++) {
apply_proc_prio(params.procs[i], cred);
if (isIoUringSupported) {
handle_io_uring_procs_prio(params, procs_count, cred);
} else {
for (int i = 0; i < procs_count; i++) apply_proc_prio(params.procs[i], cred);
}
}