Wednesday, August 17, 2011

Semaphore in Silverlight

I needed to make a Silverlight front-end for my project. And since it will communicate with the server, I wanted to place network i/o in a separated queue.

In the first minutes of coding I noticed that Visual Studio doesn't complement "Semaph" to "Semaphore" and then I discovered that there is no such thing implemented in Silverlight. Really? Seems so.

They say it's not hard to re-implement the semaphore. But the one in example simply uses 'Thread.Sleep(50);' to check for changes. This involves no more than 1000ms/50ms = 20 operations per second. This means that the semaphore introduces 50ms latency. After all, that's bad and dirty. (They use simply 'count++', no Interlocked functions => buggy code.)

Thinking of what a semaphore is I discovered it doesn't differ much from an AutoResetEvent. Roughly speaking, the latter acts like a semaphore with count 1. If I could attach a counter to an AutoResetEvent and signal it back if the counter permits, that would be a semaphore.

I made a draft:

public class BSemaphore : IDisposable
    {
        int _count = 0;
        int _maxCount = int.MaxValue;
        EventWaitHandle _ewh;

        public BSemaphore()
        {
            _ewh = new AutoResetEvent(false);
        }

        public BSemaphore(int initialCount, int maxCount)
        {
            if (initialCount < 0)
                throw new ArgumentException("Semaphore value should be >= 0.");
            if (initialCount > maxCount)
                throw new ArgumentException();

            _count = initialCount;
            _maxCount = maxCount;
            _ewh = new AutoResetEvent(_count > 0);
        }

        protected void OnSuccessfullWait()
        {
            var res = Interlocked.Decrement(ref _count);
            Debug.Assert(res >= 0, "The decremented value should be always >= 0.");
            if (res > 0)
                _ewh.Set();
        }

        public bool WaitOne()
        {
            _ewh.WaitOne();
            OnSuccessfullWait();
            return true;
        }

        public bool WaitOne(TimeSpan timeout)
        {
            if (_ewh.WaitOne(timeout))
            {
                OnSuccessfullWait();
                return true;
            }
            else
                return false;
        }

        public bool WaitOne(int millisecondsTimeout)
        {
            if (_ewh.WaitOne(millisecondsTimeout))
            {
                OnSuccessfullWait();
                return true;
            }
            else
                return false;
        }

        public void Release()
        {
            var res = Interlocked.Increment(ref _count);
            if (res > _maxCount)
                throw new ArgumentException("The value of Semaphore is bigger than predefined maxValue.");

            if (res == 1)
                _ewh.Set();
        }

        public void Dispose()
        {
            if (_ewh != null)
            {
                _ewh.Dispose();
                _ewh = null;
            }
        }
    }

I tested it and ended up being very pleased of myself.

That's not the end, however.

What is handy about traditional semaphore is that it is a kind of a WaitHandle. That permits doing so:

var wh = new WaitHandle[] { _ewhExit, _semQueue };
while (WaitHandle.WaitAny(wh) != 0)
{
    // get an item from the queue ...
}
// exit gracefully

where _ewhExit is a ManualResetEvent.

I wanted the same feature. Therefore, I needed my own (wrappers of) events and my own WaitHandle.

Here they are:

public abstract class BWaitHandle
    {
        protected abstract void OnSuccessfullWait();
        public abstract bool WaitOne();
        public abstract bool WaitOne(TimeSpan timeout);
        public abstract bool WaitOne(int millisecondsTimeout);

        internal abstract WaitHandle WaitHandle { get; }

        static WaitHandle[] ToWaitHandle(BWaitHandle[] waitHandles)
        {
            int n = waitHandles.Length;
            WaitHandle[] wh = new WaitHandle[n];

            for (int i = 0; i < n; ++i)
                wh[i] = waitHandles[i].WaitHandle;

            return wh;
        }

        public static int WaitAny(BWaitHandle[] waitHandles)
        {
            WaitHandle[] wh = ToWaitHandle(waitHandles);
            var res = WaitHandle.WaitAny(wh);
            if (res >= 0)
                waitHandles[res].OnSuccessfullWait();
            return res;
        }

        public static int WaitAny(BWaitHandle[] waitHandles, int millisecondsTimeout)
        {
            WaitHandle[] wh = ToWaitHandle(waitHandles);
            var res = WaitHandle.WaitAny(wh, millisecondsTimeout);
            if (res >= 0)
                waitHandles[res].OnSuccessfullWait();
            return res;
        }

        public static int WaitAny(BWaitHandle[] waitHandles, TimeSpan timeout)
        {
            WaitHandle[] wh = ToWaitHandle(waitHandles);
            var res = WaitHandle.WaitAny(wh, timeout);
            if (res >= 0)
                waitHandles[res].OnSuccessfullWait();
            return res;
        }

        public static int WaitAll(BWaitHandle[] waitHandles)
        {
            throw new NotImplementedException();
        }

        public static int WaitAll(BWaitHandle[] waitHandles, int millisecondsTimeout)
        {
            throw new NotImplementedException();
        }

        public static int WaitAll(BWaitHandle[] waitHandles, TimeSpan timeout)
        {
            throw new NotImplementedException();
        }
    }


    public class BManualResetEvent : BWaitHandle, IDisposable
    {
        ManualResetEvent _mre;

        public BManualResetEvent(bool initialState)
        {
            _mre = new ManualResetEvent(initialState);
        }

        // Summary:
        //     Sets the state of the event to non-signaled, which causes threads to block.
        //
        // Returns:
        //     true if the operation succeeds; otherwise, false.
        public bool Reset()
        {
            return _mre.Reset();
        }
        //
        // Summary:
        //     Sets the state of the event to signaled, which allows one or more waiting
        //     threads to proceed.
        //
        // Returns:
        //     true if the operation succeeds; otherwise, false.
        public bool Set()
        {
            return _mre.Set();
        }

        protected override void OnSuccessfullWait()
        {
            // nothing special needed
        }

        public override bool WaitOne()
        {
            return _mre.WaitOne();
        }

        public override bool WaitOne(TimeSpan timeout)
        {
            return _mre.WaitOne(timeout);
        }

        public override bool WaitOne(int millisecondsTimeout)
        {
            return _mre.WaitOne(millisecondsTimeout);
        }

        internal override WaitHandle WaitHandle
        {
            get { return _mre; }
        }

        public void Dispose()
        {
            if (_mre != null)
            {
                _mre.Dispose();
                _mre = null;
            }
        }
    }


    public class BAutoResetEvent : BWaitHandle, IDisposable
    {
        AutoResetEvent _are;

        public BAutoResetEvent(bool initialState)
        {
            _are = new AutoResetEvent(initialState);
        }

        // Summary:
        //     Sets the state of the event to non-signaled, which causes threads to block.
        //
        // Returns:
        //     true if the operation succeeds; otherwise, false.
        public bool Reset()
        {
            return _are.Reset();
        }
        //
        // Summary:
        //     Sets the state of the event to signaled, which allows one or more waiting
        //     threads to proceed.
        //
        // Returns:
        //     true if the operation succeeds; otherwise, false.
        public bool Set()
        {
            return _are.Set();
        }

        protected override void OnSuccessfullWait()
        {
            // nothing special needed
        }

        public override bool WaitOne()
        {
            throw new NotImplementedException();
        }

        public override bool WaitOne(TimeSpan timeout)
        {
            throw new NotImplementedException();
        }

        public override bool WaitOne(int millisecondsTimeout)
        {
            throw new NotImplementedException();
        }

        internal override WaitHandle WaitHandle
        {
            get { return _are; }
        }

        public void Dispose()
        {
            if (_are != null)
            {
                _are.Dispose();
                _are = null;
            }
        }
    }

    ////////////////////////////////////////////////////////////////
    // Updated semaphore
    ////////////////////////////////////////////////////////////////
    public class BSemaphore : BWaitHandle, IDisposable
    {
        int _count = 0;
        int _maxCount = int.MaxValue;
        EventWaitHandle _ewh;

        public BSemaphore()
        {
            _ewh = new AutoResetEvent(false);
        }

        public BSemaphore(int initialCount, int maxCount)
        {
            if (initialCount < 0)
                throw new ArgumentException("Semaphore value should be >= 0.");
            if (initialCount >= maxCount)
                throw new ArgumentException();

            _count = initialCount;
            _maxCount = maxCount;
            _ewh = new AutoResetEvent(_count > 0);
        }

        protected override void OnSuccessfullWait()
        {
            var res = Interlocked.Decrement(ref _count);
            Debug.Assert(res >= 0, "The decremented value should be always >= 0.");
            if (res > 0)
                _ewh.Set();
        }

        public override bool WaitOne()
        {
            _ewh.WaitOne();
            OnSuccessfullWait();
            return true;
        }

        public override bool WaitOne(TimeSpan timeout)
        {
            if (_ewh.WaitOne(timeout))
            {
                OnSuccessfullWait();
                return true;
            }
            else
                return false;
        }

        public override bool WaitOne(int millisecondsTimeout)
        {
            if (_ewh.WaitOne(millisecondsTimeout))
            {
                OnSuccessfullWait();
                return true;
            }
            else
                return false;
        }

        public void Release()
        {
            var res = Interlocked.Increment(ref _count);
            if (res > _maxCount)
                throw new ArgumentException("The value of Semaphore is bigger than predefined maxValue.");

            if (res == 1)
                _ewh.Set();
        }

        public void Dispose()
        {
            if (_ewh != null)
            {
                _ewh.Dispose();
                _ewh = null;
            }
        }

        internal override WaitHandle WaitHandle
        {
            get { return _ewh; }
        }
    }

As an exercise for the attentive reader I leave WaitAll() methods unimplemented. (I'll do it after a few requests in comments, however).

Happy coding.

P.S. Disclaimer: the code is provided "AS IS".

No comments:

Post a Comment