Jump to content

  • Log In with Google      Sign In   
  • Create Account

Finalspace

Member Since 29 Mar 2012
Offline Last Active Jun 22 2016 11:29 AM

Posts I've Made

In Topic: Single producer - multiple consumer fast queue?

21 June 2016 - 11:09 PM

Can't the queue just contain pointers to these large 300kb packets?

 

Then you can use a fast SPMC queue of pointers, and separately, a lock-free pool allocator of the packets themselves.

 

Oh this may work, i will give it a try.


In Topic: Single producer - multiple consumer fast queue?

21 June 2016 - 12:55 PM

How much memory do you need to copy? Because you would have to literally be copying several GB worth of data per second to show up as a problem. Unless this is the case, it sounds to me you're hitting false cache sharing issues while doing the copy, which would slowdown all cores considerably.

 

Hmm after analyzing, it happens to be around ~300 kb at max for a packet. So you are basically right - there is too much going on. Back to the drawing board.


In Topic: Single producer - multiple consumer fast queue?

21 June 2016 - 10:28 AM

First step: Write a lock based one, or better yet, use an existing queue such as ConcurrentQueue, which is already moderately lock free (source).
Second step: If profiling actually shows it to be a problem, then invest the time to replace it.

 

As for writing a lock free circular queue goes, for starters: Time consuming operations should be completed long before you ever attempt to insert the object into the queue. Your insertion into the queue should be one or more CAS operations based on the result of the CAS.

 

The queue itself is not the problem, but the fixed memory used for the queue entries and the process of copy the memory into the slot.

Adding or removing from the queue works just fine. Seems i need another lock for each entry to solve this problem - so a decoder can dequeue but waits until the memory is finished.


In Topic: C Queue Linked List madness

20 June 2016 - 01:01 AM

There is a mistake in dequeue method ..you need to do temp = temp->next;
then free(first); first = temp;
You are freeing the second element each time and if free->next is null then it will crash.

 

This is almost exactly what this implementation do, but it first checks if there is a next pointer and then assign it back to the first and on the else case it just sets the first and last to Zero. Ah and i dont need to free any memory - because i use a pre-allocated memory system where all memory is allocated once at start up and released when the application is done. No Need for alloc or free while the app is running ;) Just PushSize, PushStruct, PushArray, etc.


In Topic: C Queue Linked List madness

17 June 2016 - 08:59 AM

I got it working for a single producer, single consumer.

 

- Fixed size queue

- No pointer chasing (Free-list)

- Atomic operations for locks

- Plain C

- Supports returning data instead of putting

 

Here is the code:

typedef struct QueueItem {
	U8 *data;
	QueueItem *next;
} QueueItem;

typedef struct Queue {
	U8 *memory;
	U32 capacity;
	U32 used;
	volatile U32 lock;
	QueueItem *first;
	QueueItem *last;
	QueueItem *firstFree;
	QueueItem *lastFree;
} Queue;

void QueueInit(MemoryArena *arena, Queue *queue, U32 maxCount) {
	memory_index size = maxCount * sizeof(QueueItem);
	U8 *memory = PushSize(arena, size);
	queue->memory = memory;
	queue->capacity = maxCount;
	queue->used = 0;
	queue->first = 0;
	queue->firstFree = (QueueItem *)memory;
	QueueItem *cur = (QueueItem *)memory;
	QueueItem *prev = 0;
	for (U32 i = 0; i < maxCount; ++i) {
		cur->data = 0;
		cur->next = 0;
		if (i < maxCount - 1) {
			QueueItem* next = cur + 1;
			cur->next = next;
			cur = cur->next;
		}
	}
	queue->lastFree = (QueueItem *)(memory + sizeof(QueueItem) * (maxCount - 1));
	assert(queue->lastFree->next == 0);
}

B32 QueueHasItems(Queue *queue) {
	B32 result = queue->first != 0;
	return result;
}

B32 QueueCanPush(Queue *queue) {
	B32 result = queue->firstFree != 0;
	return result;
}

void QueueLock(Queue *queue) {
	U32 cmp = queue->lock;
	U32 cur;
	while ((cur = AtomicCompareExchangeU32(&queue->lock, 1, cmp)) == 1) {
		Sleep(1);
		cmp = cur;
	}
}

void QueueUnlock(Queue *queue) {
	queue->lock = 0;
}

U8 *QueuePush(Queue *queue, U8 *data) {
	U8 * result = 0;

	// Peek item from head of the list
	assert(queue->firstFree);
	QueueItem *item = queue->firstFree;

	// Pop item from the tail of the free list
	QueueItem *nextFreeItem = item->next;
	queue->firstFree = nextFreeItem;
	if (!queue->firstFree)
		queue->lastFree = 0;

	// Set data fields
	if (data)
		item->data = data;
	item->next = 0;
	result = item->data;

	// Pop item from head of the list
	if (queue->first == 0 && queue->last == 0) {
		queue->first = queue->last = item;
	} else {
		assert(queue->last);
		queue->last->next = item;
		queue->last = item;
	}

	++queue->used;

	return result;
}

U8 *QueuePop(Queue *queue) {
	// Pop item from the head of the list
	assert(queue->first);
	QueueItem *item = queue->first;
	U8 *result = item->data;
	QueueItem *next = item->next;
	if (next) {
		queue->first = next;
	} else {
		queue->last = 0;
		queue->first = 0;
	}

	// Clear data fields
	item->next = 0;

	// Push item to the tail of the free list
	QueueItem *lastFree = queue->lastFree;
	if (lastFree) {
		lastFree->next = item;
		lastFree = item;
	} else {
		queue->firstFree = item;
		queue->lastFree = item;
	}

	// Decrease used by one
	--queue->used;

	return result;
}

#define QUEUE_ADD(queue, data) (QueuePush((queue), (U8 *)(data)))
#define QUEUE_PUSH(queue, type) ((type *)QueuePush((queue), 0))
#define QUEUE_POP(queue, type) ((type *)QueuePop((queue)))

Have fun.


PARTNERS