• Advertisement
Sign in to follow this  

C Queue Linked List madness

This topic is 669 days old which is more than the 365 day threshold we allow for new replies. Please post a new topic.

If you intended to correct an error in the post then please contact us.

Recommended Posts

I am trying to create a thread-safe queue in C using instrinsics, but i get reproduceable crashes after some time.

 

Are there some stupid mistakes? There must be some mistakes in the linking of the linked list items.

This should be a simple head/tail including a first/last freelist implementation....

typedef struct QueueItem {
	U8 *data;
	QueueItem *next;
} QueueItem;

typedef struct Queue {
	U8 *memory;
	memory_index size;
	U32 maxCount;
	volatile U32 used;
	QueueItem *first;
	QueueItem *last;
	QueueItem *firstFree;
	QueueItem *lastFree;
	volatile U32 locked;
} Queue;

internal_method
void QueueInit(MemoryArena *arena, Queue *queue, U32 maxCount) {
	memory_index size = maxCount * sizeof(QueueItem);
	U8 *memory = PushSize(arena, size);
	queue->size = size;
	queue->memory = memory;
	queue->maxCount = maxCount;
	queue->used = 0;
	queue->first = 0;
	queue->firstFree = (QueueItem *)memory;
	queue->locked = 0;
	QueueItem *cur = (QueueItem *)memory;
	QueueItem *prev = 0;
	for (U32 i = 0; i < maxCount; ++i) {
		cur->data = 0;
		cur->next = 0;
		if (i < maxCount - 1) {
			QueueItem* next = cur + 1;
			cur->next = next;
			cur = cur->next;
		}
	}
	queue->lastFree = (QueueItem *)(memory + sizeof(QueueItem) * (maxCount - 1));
	Assert(queue->lastFree->next == 0);
}

inline B32 QueueIsFull(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used >= queue->maxCount;
	return result;
}

inline B32 QueueHasItems(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used > 0;
	return result;
}

inline void InterlockWait(volatile U32 *lock) {
	while (AtomicCompareExchangeU32(lock, 0, 0) == 1) {
		Sleep(1);
	}
}

internal_method
void QueuePush(Queue *queue, U8 *data) {
	// Wait and lock
	InterlockWait(&queue->locked);
	U32 prevLock = AtomicCompareExchangeU32(&queue->locked, 1, 0);
	Assert(prevLock == 0);

	// Peek item from head of the list
	Assert(queue->firstFree);
	QueueItem *item = queue->firstFree;
	Assert(!item->data);

	// Pop item from the tail of the free list
	QueueItem *nextFreeItem = item->next;
	queue->firstFree = nextFreeItem;
	if (!queue->firstFree)
		queue->lastFree = 0;

	// Set data fields
	item->data = data;
	item->next = 0;

	// Pop item from head of the list
	if (queue->first == 0) {
		queue->first = item;
		queue->last = item;
	} else {
		Assert(queue->last);
		Assert(queue->last->next == 0);
		QueueItem *prevLast = queue->last;
		queue->last = item;
		prevLast->next = item;
	}

	// Increment used count
	AtomicCompareExchangeU32(&queue->used, queue->used + 1, queue->used);

	// Unlock
	prevLock = AtomicCompareExchangeU32(&queue->locked, 0, 1);
	Assert(prevLock == 1);
}

internal_method
U8 *QueuePop(Queue *queue) {
	// Wait and lock
	InterlockWait(&queue->locked);
	U32 prevLock = AtomicCompareExchangeU32(&queue->locked, 1, 0);
	Assert(prevLock == 0);

	// Pop item from the head of the list
	Assert(queue->first);
	QueueItem *item = queue->first;
	U8 *result = item->data;
	QueueItem *next = item->next;
	if (next) {
		queue->first = next;
	} else {
		queue->last = 0;
		queue->first = 0;
	}

	// Clear data fields
	item->data = 0;
	item->next = 0;

	// Push item to the tail of the free list
	QueueItem *lastFree = queue->lastFree;
	if (lastFree) {
		lastFree->next = item;
		lastFree = item;
	} else {
		queue->firstFree = item;
		queue->lastFree = item;
	}

	// Decrement used count
	AtomicCompareExchangeU32(&queue->used, queue->used - 1, queue->used);

	// Unlock
	prevLock = AtomicCompareExchangeU32(&queue->locked, 0, 1);
	Assert(prevLock == 1);

	return result;
}

I dont see the mistake.

Edited by Finalspace

Share this post


Link to post
Share on other sites
Advertisement

Sleeping mode, but i got it..... NOT...

 

Changed the following:

inline B32 QueueIsFull(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used >= queue->maxCount;
	return result;
}
 
inline B32 QueueHasItems(Queue *queue) {
	U32 used = AtomicCompareExchangeU32(&queue->used, 0, 0);
	B32 result = used > 0;
	return result;
}

to:

inline B32 QueueIsFull(Queue *queue) {
	B32 result = queue->firstFree == 0;
	return result;
}

inline B32 QueueHasItems(Queue *queue) {
	B32 result = queue->first != 0;
	return result;
}

Does still does not work :-(

Edited by Finalspace

Share this post


Link to post
Share on other sites
Glancing at it I don't see it right off. You have a bunch of pointer following and indirection going on, so as a hunch it is likely following a null pointer or an invalid (uninitialized or released) pointer value.

To save time for those of us not wanting to run it:

* what line of your post do the reproducible crashes happen on?
* what is the exact text of the crash message?

Share this post


Link to post
Share on other sites

What was i thinking... the only thing which is fine is the linked list push/pop. Everything else is garbage. Dont code too late!

Now back to get a multiple producer, single consumer thread safe queue without any libraries in win32 working.

Share this post


Link to post
Share on other sites

I got it working for a single producer, single consumer.

 

- Fixed size queue

- No pointer chasing (Free-list)

- Atomic operations for locks

- Plain C

- Supports returning data instead of putting

 

Here is the code:

typedef struct QueueItem {
	U8 *data;
	QueueItem *next;
} QueueItem;

typedef struct Queue {
	U8 *memory;
	U32 capacity;
	U32 used;
	volatile U32 lock;
	QueueItem *first;
	QueueItem *last;
	QueueItem *firstFree;
	QueueItem *lastFree;
} Queue;

void QueueInit(MemoryArena *arena, Queue *queue, U32 maxCount) {
	memory_index size = maxCount * sizeof(QueueItem);
	U8 *memory = PushSize(arena, size);
	queue->memory = memory;
	queue->capacity = maxCount;
	queue->used = 0;
	queue->first = 0;
	queue->firstFree = (QueueItem *)memory;
	QueueItem *cur = (QueueItem *)memory;
	QueueItem *prev = 0;
	for (U32 i = 0; i < maxCount; ++i) {
		cur->data = 0;
		cur->next = 0;
		if (i < maxCount - 1) {
			QueueItem* next = cur + 1;
			cur->next = next;
			cur = cur->next;
		}
	}
	queue->lastFree = (QueueItem *)(memory + sizeof(QueueItem) * (maxCount - 1));
	assert(queue->lastFree->next == 0);
}

B32 QueueHasItems(Queue *queue) {
	B32 result = queue->first != 0;
	return result;
}

B32 QueueCanPush(Queue *queue) {
	B32 result = queue->firstFree != 0;
	return result;
}

void QueueLock(Queue *queue) {
	U32 cmp = queue->lock;
	U32 cur;
	while ((cur = AtomicCompareExchangeU32(&queue->lock, 1, cmp)) == 1) {
		Sleep(1);
		cmp = cur;
	}
}

void QueueUnlock(Queue *queue) {
	queue->lock = 0;
}

U8 *QueuePush(Queue *queue, U8 *data) {
	U8 * result = 0;

	// Peek item from head of the list
	assert(queue->firstFree);
	QueueItem *item = queue->firstFree;

	// Pop item from the tail of the free list
	QueueItem *nextFreeItem = item->next;
	queue->firstFree = nextFreeItem;
	if (!queue->firstFree)
		queue->lastFree = 0;

	// Set data fields
	if (data)
		item->data = data;
	item->next = 0;
	result = item->data;

	// Pop item from head of the list
	if (queue->first == 0 && queue->last == 0) {
		queue->first = queue->last = item;
	} else {
		assert(queue->last);
		queue->last->next = item;
		queue->last = item;
	}

	++queue->used;

	return result;
}

U8 *QueuePop(Queue *queue) {
	// Pop item from the head of the list
	assert(queue->first);
	QueueItem *item = queue->first;
	U8 *result = item->data;
	QueueItem *next = item->next;
	if (next) {
		queue->first = next;
	} else {
		queue->last = 0;
		queue->first = 0;
	}

	// Clear data fields
	item->next = 0;

	// Push item to the tail of the free list
	QueueItem *lastFree = queue->lastFree;
	if (lastFree) {
		lastFree->next = item;
		lastFree = item;
	} else {
		queue->firstFree = item;
		queue->lastFree = item;
	}

	// Decrease used by one
	--queue->used;

	return result;
}

#define QUEUE_ADD(queue, data) (QueuePush((queue), (U8 *)(data)))
#define QUEUE_PUSH(queue, type) ((type *)QueuePush((queue), 0))
#define QUEUE_POP(queue, type) ((type *)QueuePop((queue)))

Have fun.

Edited by Finalspace

Share this post


Link to post
Share on other sites

There is a mistake in dequeue method ..you need to do temp = temp->next;
then free(first); first = temp;
You are freeing the second element each time and if free->next is null then it will crash.

Share this post


Link to post
Share on other sites

There is a mistake in dequeue method ..you need to do temp = temp->next;
then free(first); first = temp;
You are freeing the second element each time and if free->next is null then it will crash.

 

This is almost exactly what this implementation do, but it first checks if there is a next pointer and then assign it back to the first and on the else case it just sets the first and last to Zero. Ah and i dont need to free any memory - because i use a pre-allocated memory system where all memory is allocated once at start up and released when the application is done. No Need for alloc or free while the app is running ;) Just PushSize, PushStruct, PushArray, etc.

Edited by Finalspace

Share this post


Link to post
Share on other sites
Sign in to follow this  

  • Advertisement