Hm, just tried again; I ran the server for about 20 minutes before the same problem occurred again. I was keeping track of all the connected clients, no new one ever connects. The message is from one of my own clients. But I'm using asserts in the client to make sure the client never sends illegal data. Moreover, I'm sending the exact same message about 30000 times or so before the problem occurs. I just connected the client and let it sit there sending and receiving updates. This is starting to annoy me. I find it totally believable that I have made a mistake, I just find it strange that it just happens all of a sudden after I have done the exact same thing ten thousand times before.
Any ideas?
Some code (in Clojure, but using Java socket channels. If you know Java and are curious you can probably figure out what I'm doing). The error occurs in this first file when I'm reading from the channel, in this piece (I get something negative out from getShort on the buffer:
Edit: Darn the indentation is messed up...
(zero? (.remaining rbuf)) (let [len (.. rbuf (flip) (getShort))
test (if (neg? len)
(println (.getInetAddress (.socket chan))))
new-netmap (assoc netmap :read-tmp (ByteBuffer/allocate len))]
which is part of this next file
Shared networking code:
;;;; Remember the following while working with this code:
;;;;
;;;; * You must use channels and buffers when using select, not
;;;; the socket streams.
;;;;
;;;; * When using select, you have to call functions in this order:
;;;; select
;;;; selectedKeys
;;;; do stuff with keys in the set
;;;; remove keys from the key set
;;;;
;;;; You cannot rely only on the flags of selection keys.
;;;;
;;;; * When writing to buffers, first write, then flip, then read, then clear.
;;;;
;;;; * Newly created buffers have (.remaining buffer) = their length,
;;;; so if you want to create a buffer used for writing, it can appear to
;;;; have data in it from the beginning. Use (doto new-buff (.flip)).
(ns game.networking
(:import java.nio.ByteBuffer
(java.io ObjectOutputStream ByteArrayOutputStream
ObjectInputStream ByteArrayInputStream
IOException)))
(defn- to-byte-array
"Turns obj into a byte[]."
[obj]
(with-open [baos (ByteArrayOutputStream.)
oos (ObjectOutputStream. baos)]
(.writeObject oos obj)
(.toByteArray baos)))
(defn- from-byte-array
"Turns the byte[] ba into an object."
[ba]
(with-open [bais (ByteArrayInputStream. ba)
ois (ObjectInputStream. bais)]
(.readObject ois)))
(defn- channel-operation
"Calls op (a read/write fn) on chan and returns
true if it filled/emptied the buffer."
[op chan buffer]
(op chan buffer)
(zero? (.remaining buffer)))
(defn- read-into-buffer
"Reads from chan into buffer. If the buffer becomes full,
returns true, else false."
[chan buffer]
(channel-operation (memfn read buf) chan buffer))
(defn- write-from-buffer
"Writes from buffer into chan. If the whole buffer was written,
returns true, else false."
[chan buffer]
(channel-operation (memfn write buf) chan buffer))
(defn read-from-channel
"Reads data from a channel found in netmap.
The netmap needs to contain chan, rbuf and read-tmp.
If netmap has been changed as a result of reading, calls
new-netmap-f with the new netmap as argument.
If a message has been completely read, calls new-msg-f
on the message.
Operates in the following manner:
1) Checks if there is an incomplete message in read-tmp and,
if so, tries to complete it.
2) If not, checks if the length of the next message has
been read, and if so, creates a new ByteBuffer of the
correct length and assocs it with read-tmp in netmap.
3) Otherwise, try to read the length of the next
message into rbuf."
[{:keys [chan rbuf read-tmp] :as netmap} new-netmap-f new-msg-f]
(cond
; incomplete msg: read more
read-tmp (if (read-into-buffer chan read-tmp)
(let [msg (from-byte-array (.array read-tmp))
new-netmap (assoc netmap :read-tmp nil)]
(new-netmap-f new-netmap)
(new-msg-f msg new-netmap)
(recur new-netmap new-netmap-f new-msg-f)))
; have read length: prepare buffer for msg
(zero? (.remaining rbuf)) (let [len (.. rbuf (flip) (getShort))
test (if (neg? len)
(println (.getInetAddress (.socket chan))))
new-netmap (assoc netmap :read-tmp (ByteBuffer/allocate len))]
(.clear rbuf)
(new-netmap-f new-netmap)
(recur new-netmap new-netmap-f new-msg-f))
; new msg: read 2 bytes from chan into rbuf (the length of the next message)
; (tries to fill rbuf, so if it can only read 1 byte,
; will come back to here later)
:else (if (read-into-buffer chan rbuf) (recur netmap new-netmap-f new-msg-f))))
(defn write-to-channel
"Writes data to a channel, found in netmap. The
netmap needs to contain chan, wbuf, write-tmp and write-q.
If netmap has been changed as a result of writing, calls
new-netmap-f with the new netmap as argument."
[{:keys [chan wbuf write-tmp write-q] :as netmap} new-netmap-f]
(cond
; length not written: write it
(pos? (.remaining wbuf)) (if (write-from-buffer chan wbuf)
(recur netmap new-netmap-f))
; msg not written: write it
write-tmp (if (write-from-buffer chan write-tmp)
(let [new-netmap (assoc netmap :write-tmp nil)]
(new-netmap-f new-netmap)
(recur new-netmap new-netmap-f)))
; no incomplete msg: see if there's one in write-q
write-q (let [test (assert (= (class "a") (class (last write-q))))
msg (to-byte-array (last write-q))
len (alength msg)]
(.clear wbuf)
(assert (pos? (short len)))
(.putShort wbuf (short len))
(.flip wbuf)
(let [new-netmap (assoc netmap
:write-tmp (ByteBuffer/wrap msg)
:write-q (butlast write-q))]
(new-netmap-f new-netmap)
(recur new-netmap new-netmap-f)))))
Module on the client side. Everything is sent with send-msg, and every msg is a String.
(ns game.networking.client
(:import java.net.InetSocketAddress
java.nio.channels.SocketChannel
java.nio.ByteBuffer)
(:require [game.networking :as net])
(:use game.utility))
(def- netmap (atom nil))
(def- msgs (atom nil))
(defn get-msgs []
(let [ms (reverse @msgs)]
(reset! msgs nil)
ms))
(defn send-msg [msg]
(let [new-netmap (update-in @netmap [:write-q] conj msg)]
(reset! netmap new-netmap)))
; make private later!
(defn update-netmap [nm]
(reset! netmap nm))
; make private later!
(defn handle-msg [msg _]
(swap! msgs conj (read-string msg)))
; make private later!
(defn create-netmap [ip port]
{:chan (let [sc (doto (SocketChannel/open (InetSocketAddress. ip port))
(.configureBlocking false))]
(.. sc (socket) (setTcpNoDelay true))
sc)
:write-q nil
:read-tmp nil
:write-tmp nil
:rbuf (ByteBuffer/allocate 2)
:wbuf (doto (ByteBuffer/allocate 2) (.flip))})
(defn connect-to-server [ip port]
(let [nm (create-netmap ip port)]
(update-netmap nm)))
(defn update-network []
(net/write-to-channel @netmap update-netmap)
(net/read-from-channel @netmap update-netmap handle-msg))
A networking module on the server side:
(ns game.networking.server
(:import java.nio.ByteBuffer
java.net.InetSocketAddress
(java.nio.channels Selector ServerSocketChannel SocketChannel
SelectionKey)
java.io.IOException)
(:use game.utility
clojure.contrib.repl-utils)
(:require [game.networking :as net]))
; ska vara private sen!
(def- clients (atom {}))
; ska vara private sen!
(def- id-counter (atom 0))
; ska vara private sen!
(def- msgs (atom nil))
(def- connected-addresses (atom nil))
(def- disconnected-clients (atom nil))
; debug
(def- bugg (atom []))
(defn get-msgs []
(let [ms (reverse @msgs)]
(reset! msgs nil)
ms))
(defn send-msg [ids msg]
(dorun (map (fn [id]
(let [new-clients (update-in @clients [id :write-q] conj msg)]
(reset! clients new-clients)))
ids)))
(defn create-server
"Creates a non-blocking TCP server that listens for incoming
connections and, when a client connects, calls f on the new
client's SelectionKey.
The server uses select to check for incoming connections and
to check for incoming data on established connections. Returns
a function that should be called periodically to call select.
The returned function returns the set of selected SelectionKeys."
[port f]
(let [selector (Selector/open)
ssc (doto (ServerSocketChannel/open) (.configureBlocking false))
ss (doto (.socket ssc) (.bind (InetSocketAddress. port)))
selection-key (.register ssc selector SelectionKey/OP_ACCEPT)]
(fn []
(swap! bugg conj (count (.keys selector)))
(.select selector)
(let [keys (.selectedKeys selector)]
; One has to check if the selected keys contains a key, not just the keys flags
(if (and (.contains keys selection-key) (.isAcceptable selection-key))
(let [sc (doto (.accept ssc) (.configureBlocking false))
client-key (doto (.register sc selector (bit-or SelectionKey/OP_READ
SelectionKey/OP_WRITE))
(.attach (swap! id-counter inc)))]
(show (.socket sc))
(flush)
(swap! connected-addresses conj (.getInetAddress (.socket sc)))
(.. sc (socket) (setTcpNoDelay true))
(f client-key)
(.remove keys selection-key)))
keys))))
(defn- new-client
"Takes a SelectionKey as input and returns a map
of various player-related things."
[selection-key]
{:s-key selection-key
:chan (.channel selection-key)
:write-q nil
:read-tmp nil
:write-tmp nil
:rbuf (ByteBuffer/allocate 2)
:wbuf (doto (ByteBuffer/allocate 2) (.flip))
:id (.attachment selection-key)})
(defn add-new-client
"Adds a new client to players."
[selection-key]
(let [client (new-client selection-key)]
(swap! clients assoc (.attachment selection-key) client)))
(defn- collect-new-msg [msg netmap]
(swap! msgs conj (conj (read-string msg) (netmap :id))))
(defn- update-client-netmap [{id :id :as netmap}]
(swap! clients assoc id netmap))
(defn- disconnect-client [{:keys [id chan]}]
(swap! disconnected-clients conj id)
(.close chan)
(swap! clients dissoc id))
(defn get-disconnected-clients []
(let [dcs @disconnected-clients]
(reset! disconnected-clients nil)
dcs))
(defn update-network
"Calls the fn server, which performs a select operation on the server,
and accepts new incoming connctions. Also reads messages from the clients,
and collect them, in the form of lists, in msgs."
[server]
(let [keys (server)]
(doseq [key keys]
(let [client (@clients (.attachment key))]
(try
(if (.isReadable key)
(net/read-from-channel client update-client-netmap collect-new-msg))
(if (and (client :write-q) (.isWritable key))
(net/write-to-channel client update-client-netmap))
(catch IOException e (disconnect-client client)))))
(.clear keys)))