C Queue Linked List madness

Started by
8 comments, last by JVG_BCN 7 years, 10 months ago

I am trying to create a thread-safe queue in C using instrinsics, but i get reproduceable crashes after some time.

Are there some stupid mistakes? There must be some mistakes in the linking of the linked list items.

This should be a simple head/tail including a first/last freelist implementation....


typedef struct QueueItem {
	U8 *data;
	QueueItem *next;
} QueueItem;

typedef struct Queue {
	U8 *memory;
	memory_index size;
	U32 maxCount;
	volatile U32 used;
	QueueItem *first;
	QueueItem *last;
	QueueItem *firstFree;
	QueueItem *lastFree;
	volatile U32 locked;
} Queue;

internal_method
void QueueInit(MemoryArena *arena, Queue *queue, U32 maxCount) {
	memory_index size = maxCount * sizeof(QueueItem);
	U8 *memory = PushSize(arena, size);
	queue->size = size;
	queue->memory = memory;
	queue->maxCount = maxCount;
	queue->used = 0;
	queue->first = 0;
	queue->firstFree = (QueueItem *)memory;
	queue->locked = 0;
	QueueItem *cur = (QueueItem *)memory;
	QueueItem *prev = 0;
	for (U32 i = 0; i < maxCount; ++i) {
		cur->data = 0;
		cur->next = 0;
		if (i < maxCount - 1) {
			QueueItem* next = cur + 1;
			cur->next = next;
			cur = cur->next;
		}
	}
	queue->lastFree = (QueueItem *)(memory + sizeof(QueueItem) * (maxCount - 1));
	Assert(queue->lastFree->next == 0);
}

inline B32 QueueIsFull(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used >= queue->maxCount;
	return result;
}

inline B32 QueueHasItems(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used > 0;
	return result;
}

inline void InterlockWait(volatile U32 *lock) {
	while (AtomicCompareExchangeU32(lock, 0, 0) == 1) {
		Sleep(1);
	}
}

internal_method
void QueuePush(Queue *queue, U8 *data) {
	// Wait and lock
	InterlockWait(&queue->locked);
	U32 prevLock = AtomicCompareExchangeU32(&queue->locked, 1, 0);
	Assert(prevLock == 0);

	// Peek item from head of the list
	Assert(queue->firstFree);
	QueueItem *item = queue->firstFree;
	Assert(!item->data);

	// Pop item from the tail of the free list
	QueueItem *nextFreeItem = item->next;
	queue->firstFree = nextFreeItem;
	if (!queue->firstFree)
		queue->lastFree = 0;

	// Set data fields
	item->data = data;
	item->next = 0;

	// Pop item from head of the list
	if (queue->first == 0) {
		queue->first = item;
		queue->last = item;
	} else {
		Assert(queue->last);
		Assert(queue->last->next == 0);
		QueueItem *prevLast = queue->last;
		queue->last = item;
		prevLast->next = item;
	}

	// Increment used count
	AtomicCompareExchangeU32(&queue->used, queue->used + 1, queue->used);

	// Unlock
	prevLock = AtomicCompareExchangeU32(&queue->locked, 0, 1);
	Assert(prevLock == 1);
}

internal_method
U8 *QueuePop(Queue *queue) {
	// Wait and lock
	InterlockWait(&queue->locked);
	U32 prevLock = AtomicCompareExchangeU32(&queue->locked, 1, 0);
	Assert(prevLock == 0);

	// Pop item from the head of the list
	Assert(queue->first);
	QueueItem *item = queue->first;
	U8 *result = item->data;
	QueueItem *next = item->next;
	if (next) {
		queue->first = next;
	} else {
		queue->last = 0;
		queue->first = 0;
	}

	// Clear data fields
	item->data = 0;
	item->next = 0;

	// Push item to the tail of the free list
	QueueItem *lastFree = queue->lastFree;
	if (lastFree) {
		lastFree->next = item;
		lastFree = item;
	} else {
		queue->firstFree = item;
		queue->lastFree = item;
	}

	// Decrement used count
	AtomicCompareExchangeU32(&queue->used, queue->used - 1, queue->used);

	// Unlock
	prevLock = AtomicCompareExchangeU32(&queue->locked, 0, 1);
	Assert(prevLock == 1);

	return result;
}

I dont see the mistake.

Advertisement

Sleeping mode, but i got it..... NOT...

Changed the following:


inline B32 QueueIsFull(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used >= queue->maxCount;
	return result;
}
 
inline B32 QueueHasItems(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used > 0;
	return result;
}

to:


inline B32 QueueIsFull(Queue *queue) {
	B32 result = queue->firstFree == 0;
	return result;
}

inline B32 QueueHasItems(Queue *queue) {
	B32 result = queue->first != 0;
	return result;
}

Does still does not work :-(

Glancing at it I don't see it right off. You have a bunch of pointer following and indirection going on, so as a hunch it is likely following a null pointer or an invalid (uninitialized or released) pointer value.

To save time for those of us not wanting to run it:

* what line of your post do the reproducible crashes happen on?
* what is the exact text of the crash message?
There's all kinds of problems I see here regarding efficiency, indicating a deep confusion about atomics and why you should be using them.

For instance, why are you doing a CAS to test if the queue is full? The entire test is inherently super-racy no matter how you go about it (even if you atomically get the queue state, by the time you return that value the state can have changed!), so at worst you need an atomic load.

Why are you using atomics to modify the size at all when all of your queue operations are behind what is essentially a spinlock? If you lock the structure before mutating it, the things you're mutating don't need to be atomics.

One bug is possibly that your implementation of a spinlock is inherently broken.

	InterlockWait(&queue->locked);
	U32 prevLock = AtomicCompareExchangeU32(&queue->locked, 1, 0);
You are looping until the lock is free, then acquiring the lock, then continuing. That's _wrong_. Major wrong. Utterly and absolutely wrong.

The correct way to spinlock is to attempt to acquire the lock in a loop until it succeeds. That's it.


// correct
while (cas(lock, expected: 0, desired: 1) != 0)
  smart_stepoff_yield_strategy();
critical_section();
atomic_store(lock, 0); // efficient unlock

// wrong (e.g. this is what you have now)
while (expensive_atomic_load_emulated_via_cas(lock) != 1)
  inefficient_sleep_strategy();
cas(lock, expected: 0, desired: 1); // holy data race, batman
critical_section();
cas(lock, expected: 1, desired: 0); // inefficient unlock
Atomics and intrinsics aren't really beginner- or even expert- friendly. Using atomics correctly and efficiently is _super ****ing hard_ and even battle-hardened experts get them wrong so often it hurts just to think about it. I suggest you spend considerable time studying industry standard implementations of spinlocks and lock-free queues and the like before you go off trying to reinvent the wheel using the most complicated tools imaginable. :)

Sean Middleditch – Game Systems Engineer – Join my team!

What was i thinking... the only thing which is fine is the linked list push/pop. Everything else is garbage. Dont code too late!

Now back to get a multiple producer, single consumer thread safe queue without any libraries in win32 working.

I got it working for a single producer, single consumer.

- Fixed size queue

- No pointer chasing (Free-list)

- Atomic operations for locks

- Plain C

- Supports returning data instead of putting

Here is the code:


typedef struct QueueItem {
	U8 *data;
	QueueItem *next;
} QueueItem;

typedef struct Queue {
	U8 *memory;
	U32 capacity;
	U32 used;
	volatile U32 lock;
	QueueItem *first;
	QueueItem *last;
	QueueItem *firstFree;
	QueueItem *lastFree;
} Queue;

void QueueInit(MemoryArena *arena, Queue *queue, U32 maxCount) {
	memory_index size = maxCount * sizeof(QueueItem);
	U8 *memory = PushSize(arena, size);
	queue->memory = memory;
	queue->capacity = maxCount;
	queue->used = 0;
	queue->first = 0;
	queue->firstFree = (QueueItem *)memory;
	QueueItem *cur = (QueueItem *)memory;
	QueueItem *prev = 0;
	for (U32 i = 0; i < maxCount; ++i) {
		cur->data = 0;
		cur->next = 0;
		if (i < maxCount - 1) {
			QueueItem* next = cur + 1;
			cur->next = next;
			cur = cur->next;
		}
	}
	queue->lastFree = (QueueItem *)(memory + sizeof(QueueItem) * (maxCount - 1));
	assert(queue->lastFree->next == 0);
}

B32 QueueHasItems(Queue *queue) {
	B32 result = queue->first != 0;
	return result;
}

B32 QueueCanPush(Queue *queue) {
	B32 result = queue->firstFree != 0;
	return result;
}

void QueueLock(Queue *queue) {
	U32 cmp = queue->lock;
	U32 cur;
	while ((cur = AtomicCompareExchangeU32(&queue->lock, 1, cmp)) == 1) {
		Sleep(1);
		cmp = cur;
	}
}

void QueueUnlock(Queue *queue) {
	queue->lock = 0;
}

U8 *QueuePush(Queue *queue, U8 *data) {
	U8 * result = 0;

	// Peek item from head of the list
	assert(queue->firstFree);
	QueueItem *item = queue->firstFree;

	// Pop item from the tail of the free list
	QueueItem *nextFreeItem = item->next;
	queue->firstFree = nextFreeItem;
	if (!queue->firstFree)
		queue->lastFree = 0;

	// Set data fields
	if (data)
		item->data = data;
	item->next = 0;
	result = item->data;

	// Pop item from head of the list
	if (queue->first == 0 && queue->last == 0) {
		queue->first = queue->last = item;
	} else {
		assert(queue->last);
		queue->last->next = item;
		queue->last = item;
	}

	++queue->used;

	return result;
}

U8 *QueuePop(Queue *queue) {
	// Pop item from the head of the list
	assert(queue->first);
	QueueItem *item = queue->first;
	U8 *result = item->data;
	QueueItem *next = item->next;
	if (next) {
		queue->first = next;
	} else {
		queue->last = 0;
		queue->first = 0;
	}

	// Clear data fields
	item->next = 0;

	// Push item to the tail of the free list
	QueueItem *lastFree = queue->lastFree;
	if (lastFree) {
		lastFree->next = item;
		lastFree = item;
	} else {
		queue->firstFree = item;
		queue->lastFree = item;
	}

	// Decrease used by one
	--queue->used;

	return result;
}

#define QUEUE_ADD(queue, data) (QueuePush((queue), (U8 *)(data)))
#define QUEUE_PUSH(queue, type) ((type *)QueuePush((queue), 0))
#define QUEUE_POP(queue, type) ((type *)QueuePop((queue)))

Have fun.

Not sure where you're going, but wouldn't a round-robin buffer be easier here?

There is a mistake in dequeue method ..you need to do temp = temp->next;
then free(first); first = temp;
You are freeing the second element each time and if free->next is null then it will crash.

There is a mistake in dequeue method ..you need to do temp = temp->next;
then free(first); first = temp;
You are freeing the second element each time and if free->next is null then it will crash.

This is almost exactly what this implementation do, but it first checks if there is a next pointer and then assign it back to the first and on the else case it just sets the first and last to Zero. Ah and i dont need to free any memory - because i use a pre-allocated memory system where all memory is allocated once at start up and released when the application is done. No Need for alloc or free while the app is running ;) Just PushSize, PushStruct, PushArray, etc.

I would recommend reading the following article in dr dobbs:

http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448

This topic is closed to new replies.

Advertisement