I am trying to create a thread-safe queue in C using instrinsics, but i get reproduceable crashes after some time.
Are there some stupid mistakes? There must be some mistakes in the linking of the linked list items.
This should be a simple head/tail including a first/last freelist implementation....
typedef struct QueueItem {
U8 *data;
QueueItem *next;
} QueueItem;
typedef struct Queue {
U8 *memory;
memory_index size;
U32 maxCount;
volatile U32 used;
QueueItem *first;
QueueItem *last;
QueueItem *firstFree;
QueueItem *lastFree;
volatile U32 locked;
} Queue;
internal_method
void QueueInit(MemoryArena *arena, Queue *queue, U32 maxCount) {
memory_index size = maxCount * sizeof(QueueItem);
U8 *memory = PushSize(arena, size);
queue->size = size;
queue->memory = memory;
queue->maxCount = maxCount;
queue->used = 0;
queue->first = 0;
queue->firstFree = (QueueItem *)memory;
queue->locked = 0;
QueueItem *cur = (QueueItem *)memory;
QueueItem *prev = 0;
for (U32 i = 0; i < maxCount; ++i) {
cur->data = 0;
cur->next = 0;
if (i < maxCount - 1) {
QueueItem* next = cur + 1;
cur->next = next;
cur = cur->next;
}
}
queue->lastFree = (QueueItem *)(memory + sizeof(QueueItem) * (maxCount - 1));
Assert(queue->lastFree->next == 0);
}
inline B32 QueueIsFull(Queue *queue) {
U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
B32 result = used >= queue->maxCount;
return result;
}
inline B32 QueueHasItems(Queue *queue) {
U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
B32 result = used > 0;
return result;
}
inline void InterlockWait(volatile U32 *lock) {
while (AtomicCompareExchangeU32(lock, 0, 0) == 1) {
Sleep(1);
}
}
internal_method
void QueuePush(Queue *queue, U8 *data) {
// Wait and lock
InterlockWait(&queue->locked);
U32 prevLock = AtomicCompareExchangeU32(&queue->locked, 1, 0);
Assert(prevLock == 0);
// Peek item from head of the list
Assert(queue->firstFree);
QueueItem *item = queue->firstFree;
Assert(!item->data);
// Pop item from the tail of the free list
QueueItem *nextFreeItem = item->next;
queue->firstFree = nextFreeItem;
if (!queue->firstFree)
queue->lastFree = 0;
// Set data fields
item->data = data;
item->next = 0;
// Pop item from head of the list
if (queue->first == 0) {
queue->first = item;
queue->last = item;
} else {
Assert(queue->last);
Assert(queue->last->next == 0);
QueueItem *prevLast = queue->last;
queue->last = item;
prevLast->next = item;
}
// Increment used count
AtomicCompareExchangeU32(&queue->used, queue->used + 1, queue->used);
// Unlock
prevLock = AtomicCompareExchangeU32(&queue->locked, 0, 1);
Assert(prevLock == 1);
}
internal_method
U8 *QueuePop(Queue *queue) {
// Wait and lock
InterlockWait(&queue->locked);
U32 prevLock = AtomicCompareExchangeU32(&queue->locked, 1, 0);
Assert(prevLock == 0);
// Pop item from the head of the list
Assert(queue->first);
QueueItem *item = queue->first;
U8 *result = item->data;
QueueItem *next = item->next;
if (next) {
queue->first = next;
} else {
queue->last = 0;
queue->first = 0;
}
// Clear data fields
item->data = 0;
item->next = 0;
// Push item to the tail of the free list
QueueItem *lastFree = queue->lastFree;
if (lastFree) {
lastFree->next = item;
lastFree = item;
} else {
queue->firstFree = item;
queue->lastFree = item;
}
// Decrement used count
AtomicCompareExchangeU32(&queue->used, queue->used - 1, queue->used);
// Unlock
prevLock = AtomicCompareExchangeU32(&queue->locked, 0, 1);
Assert(prevLock == 1);
return result;
}
I dont see the mistake.