Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aethalloc-abi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license.workspace = true
crate-type = ["cdylib"]

[features]
default = ["simple-cache"]
default = ["magazine-caching"]
magazine-caching = ["aethalloc-core/magazine"]
simple-cache = []
metrics = []
Expand Down
26 changes: 22 additions & 4 deletions aethalloc-abi/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use alloc::alloc::{GlobalAlloc, Layout};
use core::ptr::NonNull;
use core::sync::atomic::{AtomicPtr, AtomicU64, Ordering};
use core::sync::atomic::{AtomicU64, Ordering};

use aethalloc_core::page::PageAllocator;
use aethalloc_core::size_class::round_up_pow2;
Expand All @@ -19,7 +19,9 @@ const PAGE_MASK: usize = !(PAGE_SIZE - 1);
const MAX_CACHE_SIZE: usize = 65536;
const NUM_SIZE_CLASSES: usize = 14;
const METRICS_FLUSH_THRESHOLD: usize = 4096;
#[cfg(not(feature = "magazine-caching"))]
const MAX_FREE_LIST_LENGTH: usize = 4096;
#[cfg(not(feature = "magazine-caching"))]
const GLOBAL_FREE_BATCH: usize = 128;

const MAGIC: u32 = 0xA7E8A110;
Expand Down Expand Up @@ -581,6 +583,18 @@ unsafe impl GlobalAlloc for AethAlloc {
return block.add(CACHE_HEADER_SIZE);
}

// Try swap with local free_mag for reuse
if !cache.free_mags[class].is_empty() {
core::mem::swap(&mut cache.alloc_mags[class], &mut cache.free_mags[class]);
if let Some(block) = cache.alloc_mags[class].pop() {
cache.metrics.cache_hits += 1;
cache.metrics.allocs += 1;
cache.metrics.maybe_flush();
core::ptr::write(block as *mut usize, size);
return block.add(CACHE_HEADER_SIZE);
}
}

// Try to get a full magazine from global pool
if let Some(node_ptr) = GLOBAL_MAGAZINES.get(class).pop_full() {
let node = &mut *node_ptr;
Expand Down Expand Up @@ -609,9 +623,13 @@ unsafe impl GlobalAlloc for AethAlloc {
if blocks_per_page > 1 {
if let Some(base) = PageAllocator::alloc(1) {
let base_ptr = base.as_ptr();
for i in 1..blocks_per_page {
let block_ptr = base_ptr.add(i * block_size);
let _ = cache.alloc_mags[class].push(block_ptr);
let remaining = blocks_per_page.saturating_sub(1);
if remaining > 0 {
cache.alloc_mags[class].bulk_init(
base_ptr.add(block_size),
block_size,
remaining,
);
}
core::ptr::write(base_ptr as *mut usize, size);
cache.metrics.maybe_flush();
Expand Down
26 changes: 26 additions & 0 deletions aethalloc-core/src/magazine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use core::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};

pub const MAGAZINE_CAPACITY: usize = 64;
pub const NUM_SIZE_CLASSES: usize = 13;
pub const MAX_GLOBAL_MAGAZINES_PER_CLASS: usize = 8;

/// Magazine: A container for 64 memory block pointers
#[repr(C)]
Expand Down Expand Up @@ -56,6 +57,20 @@ impl Magazine {
pub fn clear(&mut self) {
self.count = 0;
}

#[inline]
pub fn bulk_init(&mut self, base: *mut u8, block_size: usize, count: usize) {
let to_add = count.min(MAGAZINE_CAPACITY - self.count);
for i in 0..to_add {
self.blocks[self.count + i] = unsafe { base.add(i * block_size) };
}
self.count += to_add;
}

#[inline]
pub fn len(&self) -> usize {
self.count
}
}

impl Default for Magazine {
Expand Down Expand Up @@ -191,6 +206,17 @@ impl GlobalMagazinePool {
}
}
}

#[inline]
pub fn full_depth(&self) -> usize {
let mut count = 0;
let mut current = self.full_head.load(Ordering::Relaxed);
while !current.is_null() && count < MAX_GLOBAL_MAGAZINES_PER_CLASS + 1 {
current = unsafe { (*current).next };
count += 1;
}
count
}
}

/// All global magazine pools (one per size class)
Expand Down
42 changes: 42 additions & 0 deletions benches/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
CC ?= gcc
CFLAGS ?= -O3 -pthread -Wall -Wextra

BENCHMARKS = packet_churn tail_latency producer_consumer fragmentation multithread_churn \
micro_burst rss_reclaim asymmetric_threads kv_store massive_alloc

all: $(BENCHMARKS)

packet_churn: packet_churn.c
$(CC) $(CFLAGS) -o $@ $<

tail_latency: tail_latency.c
$(CC) $(CFLAGS) -o $@ $<

producer_consumer: producer_consumer.c
$(CC) $(CFLAGS) -o $@ $<

fragmentation: fragmentation.c
$(CC) $(CFLAGS) -o $@ $<

multithread_churn: multithread_churn.c
$(CC) $(CFLAGS) -o $@ $<

micro_burst: micro_burst.c
$(CC) $(CFLAGS) -o $@ $<

rss_reclaim: rss_reclaim.c
$(CC) $(CFLAGS) -o $@ $<

asymmetric_threads: asymmetric_threads.c
$(CC) $(CFLAGS) -o $@ $<

kv_store: kv_store.c
$(CC) $(CFLAGS) -o $@ $<

massive_alloc: massive_alloc.c
$(CC) $(CFLAGS) -o $@ $<

clean:
rm -f $(BENCHMARKS)

.PHONY: all clean
207 changes: 207 additions & 0 deletions benches/asymmetric_threads.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <pthread.h>
#include <time.h>

#define NUM_OBJECTS 10000
#define NUM_PRODUCER_THREADS 4
#define NUM_CONSUMER_THREADS 4
#define OBJECT_SIZE 128

typedef struct {
void *data;
volatile int ready;
} Object;

typedef struct {
Object *objects;
int count;
int producer_idx;
int consumer_idx;
pthread_mutex_t lock;
pthread_cond_t not_empty;
pthread_cond_t not_full;
volatile int done;
} SharedQueue;

static uint64_t get_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000000000ULL + ts.tv_nsec;
}

static long get_rss_kb(void) {
FILE *f = fopen("/proc/self/statm", "r");
if (!f) return -1;
long size, rss;
if (fscanf(f, "%ld %ld", &size, &rss) != 2) {
fclose(f);
return -1;
}
fclose(f);
return rss * 4;
}

static void *producer_thread(void *arg) {
SharedQueue *queue = (SharedQueue *)arg;
uint64_t alloc_time = 0;
int allocs = 0;

for (int i = 0; i < queue->count / NUM_PRODUCER_THREADS; i++) {
uint64_t t0 = get_ns();
void *obj = malloc(OBJECT_SIZE);
uint64_t t1 = get_ns();

if (!obj) continue;

memset(obj, 0xAA, OBJECT_SIZE);
alloc_time += (t1 - t0);
allocs++;

pthread_mutex_lock(&queue->lock);
while (((queue->producer_idx + 1) % queue->count) == queue->consumer_idx) {
pthread_cond_wait(&queue->not_full, &queue->lock);
}

queue->objects[queue->producer_idx].data = obj;
queue->objects[queue->producer_idx].ready = 1;
queue->producer_idx = (queue->producer_idx + 1) % queue->count;

pthread_cond_signal(&queue->not_empty);
pthread_mutex_unlock(&queue->lock);
}

return (void *)alloc_time;
}

static void *consumer_thread(void *arg) {
SharedQueue *queue = (SharedQueue *)arg;
uint64_t free_time = 0;
int frees = 0;

int consumed = 0;
int to_consume = queue->count / NUM_CONSUMER_THREADS;

while (consumed < to_consume && !queue->done) {
pthread_mutex_lock(&queue->lock);

while (queue->consumer_idx == queue->producer_idx && !queue->done) {
pthread_cond_wait(&queue->not_empty, &queue->lock);
}

if (queue->consumer_idx == queue->producer_idx) {
pthread_mutex_unlock(&queue->lock);
break;
}

void *obj = queue->objects[queue->consumer_idx].data;
queue->objects[queue->consumer_idx].ready = 0;
queue->consumer_idx = (queue->consumer_idx + 1) % queue->count;

pthread_cond_signal(&queue->not_full);
pthread_mutex_unlock(&queue->lock);

if (obj) {
uint64_t t0 = get_ns();
free(obj);
uint64_t t1 = get_ns();
free_time += (t1 - t0);
frees++;
}
consumed++;
}

return (void *)free_time;
}

int main(int argc, char **argv) {
int num_objects = NUM_OBJECTS;
int num_producers = NUM_PRODUCER_THREADS;
int num_consumers = NUM_CONSUMER_THREADS;

if (argc > 1) num_objects = atoi(argv[1]);
if (argc > 2) num_producers = atoi(argv[2]);
if (argc > 3) num_consumers = atoi(argv[3]);

long baseline_rss = get_rss_kb();

SharedQueue queue = {
.count = num_objects * 2,
.producer_idx = 0,
.consumer_idx = 0,
.done = 0
};

queue.objects = calloc(queue.count, sizeof(Object));
if (!queue.objects) {
fprintf(stderr, "Failed to allocate queue\n");
return 1;
}

pthread_mutex_init(&queue.lock, NULL);
pthread_cond_init(&queue.not_empty, NULL);
pthread_cond_init(&queue.not_full, NULL);

pthread_t producers[num_producers];
pthread_t consumers[num_consumers];

uint64_t start = get_ns();

for (int i = 0; i < num_consumers; i++) {
pthread_create(&consumers[i], NULL, consumer_thread, &queue);
}

for (int i = 0; i < num_producers; i++) {
pthread_create(&producers[i], NULL, producer_thread, &queue);
}

uint64_t total_alloc_time = 0;
uint64_t total_free_time = 0;

for (int i = 0; i < num_producers; i++) {
void *result;
pthread_join(producers[i], &result);
total_alloc_time += (uint64_t)result;
}

queue.done = 1;
pthread_cond_broadcast(&queue.not_empty);

for (int i = 0; i < num_consumers; i++) {
void *result;
pthread_join(consumers[i], &result);
total_free_time += (uint64_t)result;
}

uint64_t end = get_ns();

long peak_rss = get_rss_kb();

long final_rss = get_rss_kb();

double elapsed = (end - start) / 1e9;
int total_ops = num_objects;

printf("{\"benchmark\": \"asymmetric_threads\", ");
printf("\"config\": {\"objects\": %d, \"producers\": %d, \"consumers\": %d}, ",
num_objects, num_producers, num_consumers);
printf("\"results\": {");
printf("\"throughput_ops_per_sec\": %.0f, ", total_ops / elapsed);
printf("\"avg_alloc_ns\": %.1f, ", (double)total_alloc_time / total_ops);
printf("\"avg_free_ns\": %.1f, ", (double)total_free_time / total_ops);
printf("\"total_time_sec\": %.3f, ", elapsed);
printf("\"baseline_rss_kb\": %ld, ", baseline_rss);
printf("\"peak_rss_kb\": %ld, ", peak_rss);
printf("\"final_rss_kb\": %ld, ", final_rss);
printf("\"memory_retained_kb\": %ld", final_rss - baseline_rss);
printf("}}\n");

free(queue.objects);
pthread_mutex_destroy(&queue.lock);
pthread_cond_destroy(&queue.not_empty);
pthread_cond_destroy(&queue.not_full);

return 0;
}
Loading
Loading