当前位置:  编程技术>.net/c#/asp.net

深入线程安全容器的实现方法

    来源: 互联网  发布时间:2014-10-19

    本文导语:  最近写了个小程序用到了C#4.0中的线程安全集合。想起很久以前用C#2.0开发的时候写后台windows服务,为了利用多线程实现生产者和消费者模型,经常要封装一些线程安全的容器,比如泛型队列和字典等等。下面就结合部分MS的源...

最近写了个小程序用到了C#4.0中的线程安全集合。想起很久以前用C#2.0开发的时候写后台windows服务,为了利用多线程实现生产者和消费者模型,经常要封装一些线程安全的容器,比如泛型队列和字典等等。下面就结合部分MS的源码和自己的开发经验浅显地分析一下如何实现线程安全容器以及实现线程安全容器容易产生的问题。

一、ArrayList

在C#早期版本中已经实现了线程安全的ArrayList,可以通过下面的方式构造线程安全的数组列表:

var array = ArrayList.Synchronized(new ArrayList());

我们从Synchronized方法入手,分析它的源代码看是如何实现线程安全的:

代码如下:

Synchronized        /// Returns an wrapper that is synchronized (thread safe).
        /// An wrapper that is synchronized (thread safe).
        /// The to synchronize.
        ///
        ///   is null.
        /// 2
        [HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
        public static ArrayList Synchronized(ArrayList list)
        {
            if (list == null)
            {
                throw new ArgumentNullException("list");
            }
            return new ArrayList.SyncArrayList(list);
        }


继续跟进去,发现SyncArrayList是一个继承自ArrayList的私有类,内部线程安全方法的实现经过分析,很多都是像下面这样lock(注意是lock_root对象而不是数组列表实例对象)一下完事:

lock (this._root)

有心的你可以查看SyncArrayList的源码:

代码如下:

SyncArrayList        [Serializable]
        private class SyncArrayList : ArrayList
        {
            private ArrayList _list;
            private object _root;
            public override int Capacity
            {
                get
                {
                    int capacity;
                    lock (this._root)
                    {
                        capacity = this._list.Capacity;
                    }
                    return capacity;
                }
                set
                {
                    lock (this._root)
                    {
                        this._list.Capacity = value;
                    }
                }
            }
            public override int Count
            {
                get
                {
                    int count;
                    lock (this._root)
                    {
                        count = this._list.Count;
                    }
                    return count;
                }
            }
            public override bool IsReadOnly
            {
                get
                {
                    return this._list.IsReadOnly;
                }
            }
            public override bool IsFixedSize
            {
                get
                {
                    return this._list.IsFixedSize;
                }
            }
            public override bool IsSynchronized
            {
                get
                {
                    return true;
                }
            }
            public override object this[int index]
            {
                get
                {
                    object result;
                    lock (this._root)
                    {
                        result = this._list[index];
                    }
                    return result;
                }
                set
                {
                    lock (this._root)
                    {
                        this._list[index] = value;
                    }
                }
            }
            public override object SyncRoot
            {
                get
                {
                    return this._root;
                }
            }
            internal SyncArrayList(ArrayList list)
                : base(false)
            {
                this._list = list;
                this._root = list.SyncRoot;
            }
            public override int Add(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.Add(value);
                }
                return result;
            }
            public override void AddRange(ICollection c)
            {
                lock (this._root)
                {
                    this._list.AddRange(c);
                }
            }
            public override int BinarySearch(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(value);
                }
                return result;
            }
            public override int BinarySearch(object value, IComparer comparer)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(value, comparer);
                }
                return result;
            }
            public override int BinarySearch(int index, int count, object value, IComparer comparer)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.BinarySearch(index, count, value, comparer);
                }
                return result;
            }
            public override void Clear()
            {
                lock (this._root)
                {
                    this._list.Clear();
                }
            }
            public override object Clone()
            {
                object result;
                lock (this._root)
                {
                    result = new ArrayList.SyncArrayList((ArrayList)this._list.Clone());
                }
                return result;
            }
            public override bool Contains(object item)
            {
                bool result;
                lock (this._root)
                {
                    result = this._list.Contains(item);
                }
                return result;
            }
            public override void CopyTo(Array array)
            {
                lock (this._root)
                {
                    this._list.CopyTo(array);
                }
            }
            public override void CopyTo(Array array, int index)
            {
                lock (this._root)
                {
                    this._list.CopyTo(array, index);
                }
            }
            public override void CopyTo(int index, Array array, int arrayIndex, int count)
            {
                lock (this._root)
                {
                    this._list.CopyTo(index, array, arrayIndex, count);
                }
            }
            public override IEnumerator GetEnumerator()
            {
                IEnumerator enumerator;
                lock (this._root)
                {
                    enumerator = this._list.GetEnumerator();
                }
                return enumerator;
            }
            public override IEnumerator GetEnumerator(int index, int count)
            {
                IEnumerator enumerator;
                lock (this._root)
                {
                    enumerator = this._list.GetEnumerator(index, count);
                }
                return enumerator;
            }
            public override int IndexOf(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value);
                }
                return result;
            }
            public override int IndexOf(object value, int startIndex)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value, startIndex);
                }
                return result;
            }
            public override int IndexOf(object value, int startIndex, int count)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.IndexOf(value, startIndex, count);
                }
                return result;
            }
            public override void Insert(int index, object value)
            {
                lock (this._root)
                {
                    this._list.Insert(index, value);
                }
            }
            public override void InsertRange(int index, ICollection c)
            {
                lock (this._root)
                {
                    this._list.InsertRange(index, c);
                }
            }
            public override int LastIndexOf(object value)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value);
                }
                return result;
            }
            public override int LastIndexOf(object value, int startIndex)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value, startIndex);
                }
                return result;
            }
            public override int LastIndexOf(object value, int startIndex, int count)
            {
                int result;
                lock (this._root)
                {
                    result = this._list.LastIndexOf(value, startIndex, count);
                }
                return result;
            }
            public override void Remove(object value)
            {
                lock (this._root)
                {
                    this._list.Remove(value);
                }
            }
            public override void RemoveAt(int index)
            {
                lock (this._root)
                {
                    this._list.RemoveAt(index);
                }
            }
            public override void RemoveRange(int index, int count)
            {
                lock (this._root)
                {
                    this._list.RemoveRange(index, count);
                }
            }
            public override void Reverse(int index, int count)
            {
                lock (this._root)
                {
                    this._list.Reverse(index, count);
                }
            }
            public override void SetRange(int index, ICollection c)
            {
                lock (this._root)
                {
                    this._list.SetRange(index, c);
                }
            }
            public override ArrayList GetRange(int index, int count)
            {
                ArrayList range;
                lock (this._root)
                {
                    range = this._list.GetRange(index, count);
                }
                return range;
            }
            public override void Sort()
            {
                lock (this._root)
                {
                    this._list.Sort();
                }
            }
            public override void Sort(IComparer comparer)
            {
                lock (this._root)
                {
                    this._list.Sort(comparer);
                }
            }
            public override void Sort(int index, int count, IComparer comparer)
            {
                lock (this._root)
                {
                    this._list.Sort(index, count, comparer);
                }
            }
            public override object[] ToArray()
            {
                object[] result;
                lock (this._root)
                {
                    result = this._list.ToArray();
                }
                return result;
            }
            public override Array ToArray(Type type)
            {
                Array result;
                lock (this._root)
                {
                    result = this._list.ToArray(type);
                }
                return result;
            }
            public override void TrimToSize()
            {
                lock (this._root)
                {
                    this._list.TrimToSize();
                }
            }
        }


ArrayList线程安全实现过程小结:定义ArrayList的私有实现SyncArrayList,子类内部通过lock同步构造实现线程安全,在ArrayList中通过Synchronized对外间接调用子类。  二、Hashtable

同样,在C#早期版本中实现了线程安全的Hashtable,它也是早期开发中经常用到的缓存容器,可以通过下面的方式构造线程安全的哈希表:

var ht = Hashtable.Synchronized(new Hashtable());

同样地,我们从Synchronized方法入手,分析它的源代码看是如何实现线程安全的:

代码如下:

Synchronized        /// Returns a synchronized (thread-safe) wrapper for the .
        /// A synchronized (thread-safe) wrapper for the .
        /// The to synchronize.
        ///
        ///   is null.
        /// 1
        [HostProtection(SecurityAction.LinkDemand, Synchronization = true)]
        public static Hashtable Synchronized(Hashtable table)
        {
            if (table == null)
            {
                throw new ArgumentNullException("table");
            }
            return new Hashtable.SyncHashtable(table);
        }

继续跟进去,发现SyncHashtable是一个继承自Hashtable和IEnumerable接口的私有类,内部线程安全方法的实现经过分析,很多都是像下面这样lock(注意是lock哈希表的SyncRoot Object实例对象而不是哈希表实例)一下完事:
lock (this._table.SyncRoot)

贴一下SyncHashtable的源码:

代码如下:

 [Serializable]
        private class SyncHashtable : Hashtable, IEnumerable
        {
            protected Hashtable _table;
            public override int Count
            {
                get
                {
                    return this._table.Count;
                }
            }
            public override bool IsReadOnly
            {
                get
                {
                    return this._table.IsReadOnly;
                }
            }
            public override bool IsFixedSize
            {
                get
                {
                    return this._table.IsFixedSize;
                }
            }
            public override bool IsSynchronized
            {
                get
                {
                    return true;
                }
            }
            public override object this[object key]
            {
                [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
                get
                {
                    return this._table[key];
                }
                set
                {
                    lock (this._table.SyncRoot)
                    {
                        this._table[key] = value;
                    }
                }
            }
            public override object SyncRoot
            {
                get
                {
                    return this._table.SyncRoot;
                }
            }
            public override ICollection Keys
            {
                get
                {
                    ICollection keys;
                    lock (this._table.SyncRoot)
                    {
                        keys = this._table.Keys;
                    }
                    return keys;
                }
            }
            public override ICollection Values
            {
                get
                {
                    ICollection values;
                    lock (this._table.SyncRoot)
                    {
                        values = this._table.Values;
                    }
                    return values;
                }
            }
            internal SyncHashtable(Hashtable table)
                : base(false)
            {
                this._table = table;
            }
            internal SyncHashtable(SerializationInfo info, StreamingContext context)
                : base(info, context)
            {
                this._table = (Hashtable)info.GetValue("ParentTable", typeof(Hashtable));
                if (this._table == null)
                {
                    throw new SerializationException(Environment.GetResourceString("Serialization_InsufficientState"));
                }
            }
            [SecurityCritical]
            public override void GetObjectData(SerializationInfo info, StreamingContext context)
            {
                if (info == null)
                {
                    throw new ArgumentNullException("info");
                }
                lock (this._table.SyncRoot)
                {
                    info.AddValue("ParentTable", this._table, typeof(Hashtable));
                }
            }
            public override void Add(object key, object value)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Add(key, value);
                }
            }
            public override void Clear()
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Clear();
                }
            }
            [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
            public override bool Contains(object key)
            {
                return this._table.Contains(key);
            }
            [TargetedPatchingOptOut("Performance critical to inline across NGen image boundaries")]
            public override bool ContainsKey(object key)
            {
                return this._table.ContainsKey(key);
            }
            public override bool ContainsValue(object key)
            {
                bool result;
                lock (this._table.SyncRoot)
                {
                    result = this._table.ContainsValue(key);
                }
                return result;
            }
            public override void CopyTo(Array array, int arrayIndex)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.CopyTo(array, arrayIndex);
                }
            }
            public override object Clone()
            {
                object result;
                lock (this._table.SyncRoot)
                {
                    result = Hashtable.Synchronized((Hashtable)this._table.Clone());
                }
                return result;
            }
            IEnumerator IEnumerable.GetEnumerator()
            {
                return this._table.GetEnumerator();
            }
            public override IDictionaryEnumerator GetEnumerator()
            {
                return this._table.GetEnumerator();
            }
            public override void Remove(object key)
            {
                lock (this._table.SyncRoot)
                {
                    this._table.Remove(key);
                }
            }
            public override void OnDeserialization(object sender)
            {
            }
            internal override KeyValuePairs[] ToKeyValuePairsArray()
            {
                return this._table.ToKeyValuePairsArray();
            }
        }


Hashtable线程安全实现过程小结:定义Hashtable的私有实现SyncHashtable,子类内部通过lock同步构造实现线程安全,在Hashtable中通过Synchronized对外间接调用子类。 三、4.0中的线程安全容器 1、ConcurrentQueue

从上面的实现分析来说,封装一个线程安全的容器看起来并不是什么难事,除了对线程安全容器的异常处理心有余悸,其他的似乎按步就班就可以了,不是吗?也许还有更高明的实现吧?

在4.0中,多了一个System.Collections.Concurrent命名空间,怀着忐忑的心情查看C#4.0其中的一个线程安全集合ConcurrentQueue的源码,发现它继承自IProducerConsumerCollection, IEnumerable, ICollection, IEnumerable接口,内部实现线程安全的时候,通过SpinWait和通过互锁构造(Interlocked)及SpinWait封装的Segment,间接实现了线程安全。Segment的实现比较复杂,和线程安全密切相关的方法就是TryXXX那几个方法,源码如下:

代码如下:

      private class Segment
        {
            internal T[] m_array;
            private int[] m_state;
            private ConcurrentQueue.Segment m_next;
            internal readonly long m_index;
            private int m_low;
            private int m_high;
            internal ConcurrentQueue.Segment Next
            {
                get
                {
                    return this.m_next;
                }
            }
            internal bool IsEmpty
            {
                get
                {
                    return this.Low > this.High;
                }
            }
            internal int Low
            {
                get
                {
                    return Math.Min(this.m_low, 32);
                }
            }
            internal int High
            {
                get
                {
                    return Math.Min(this.m_high, 31);
                }
            }
            internal Segment(long index)
            {
                this.m_array = new T[32];
                this.m_state = new int[32];
                this.m_high = -1;
                this.m_index = index;
            }
            internal void UnsafeAdd(T value)
            {
                this.m_high++;
                this.m_array[this.m_high] = value;
                this.m_state[this.m_high] = 1;
            }
            internal ConcurrentQueue.Segment UnsafeGrow()
            {
                ConcurrentQueue.Segment segment = new ConcurrentQueue.Segment(this.m_index + 1L);
                this.m_next = segment;
                return segment;
            }
            internal void Grow(ref ConcurrentQueue.Segment tail)
            {
                ConcurrentQueue.Segment next = new ConcurrentQueue.Segment(this.m_index + 1L);
                this.m_next = next;
                tail = this.m_next;
            }
            internal bool TryAppend(T value, ref ConcurrentQueue.Segment tail)
            {
                if (this.m_high >= 31)
                {
                    return false;
                }
                int num = 32;
                try
                {
                }
                finally
                {
                    num = Interlocked.Increment(ref this.m_high);
                    if (num 0)
            {
                return list[0];
            }
            return 0;
        }

当列表中的元素为1个的时候,上面的操作非常容易进入一个无厘头的陷阱之中。有人会问怎么会有陷阱,你看取数据之前都判断了啊,逻辑正确了啊,这哪里还有问题吗?

按照类似于1中的分析,GetFirstOrDefault应该可以分为下面两步:

(1)线程1取数据,判断list.Count的时候发现列表内有1个元素,这一步线程安全,没有任何问题,然后准备返回索引为0的元素;

(2)线程2在线程1将要取索引为0的元素之前移除了列表中的唯一元素或者直接将list指向null,这样线程1通过索引取元素就抛出异常了。

 

3、如何保证容器数据操作安全?

从上面的两个示例,我们得知通常所看到的线程安全实际上并不一定安全。不安全的主要原因就是容器内的数据很容易被其他线程改变,或者可以简要概括为:一段时间差引发的血案。实际上,我们平时所做的业务系统,归根结底很多bug或者隐藏的缺陷都是由不起眼的一小段时间差引起的。

保证容器内的数据和操作都安全,一种简单而有效的方法就是将你所要进行的操作进行“事务”处理。比如示例1中哈希表的Value的遍历操作,通常情况下,我们分作两步:

(1)、(安全地)读取数据

(2)、(不安全地)遍历;

为了达到遍历操作不抛出异常,我们可以把两步合并为一步,抽象出一个线程安全的新方法TryGetAndEnumerate,这样可以保证线程安全地取数据和遍历,具体实现无非是lock一下SyncRoot类似的这种思路。但是这种线程安全的遍历可能代价很高,而且极其不通用。

线程安全集合容易产生的问题和解决方法,请参考JaredPar MSFT的Why are thread safe collections so hard?,这篇文章对设计一个线程安全的容器的指导原则是:

1、Don't add an decision procedures(procedures like Count as decision procedures).  They lead users down the path to bad code.
2、Methods which query the object can always fail and the API should reflect this.

实际上大家都知道利用事务处理思想多用TryXXX方法一般是没错的。


    
 
 

您可能感兴趣的文章:

  • 如何深入了解线程
  • 深入多线程之:深入分析Interlocked
  • 深入多线程之:解析线程的交会(Thread Rendezvous)详解
  • 深入分析父子线程、进程终止顺序不同产生的结果
  • 深入多线程之:深入生产者、消费者队列分析
  • 深入多线程之:双向信号与竞赛的用法分析
  • 深入多线程之:用Wait与Pulse模拟一些同步构造的应用详解
  • 深入Java线程中断的本质与编程原则的概述
  • 深入SQLite多线程的使用总结详解
  • 深入java线程池的使用详解 iis7站长之家
  • 深入多线程之:Wait与Pulse的使用详解
  • 深入探讨linux下进程的最大线程数、进程最大数、进程打开的文件数
  • Java 多线程同步 锁机制与synchronized深入解析
  • 深入Sqlite多线程入库的问题
  • 深入多线程之:Reader与Write Locks(读写锁)的使用详解
  • 深入Android线程的相关问题解惑
  • 深入java线程池的使用详解
  • 深入Android Handler与线程间通信ITC的详解
  • Java线程中断的本质深入理解
  • Android开发笔记之:深入理解多线程AsyncTask
  • Docker支持更深入的容器日志分析
  • 深入解析C++ STL中的常用容器
  • Java容器类的深入理解
  •  
    本站(WWW.)旨在分享和传播互联网科技相关的资讯和技术,将尽最大努力为读者提供更好的信息聚合和浏览方式。
    本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。












  • 相关文章推荐
  • 用C++实现strcpy(),返回一个char*类型的深入分析
  • 先序遍历二叉树的递归实现与非递归实现深入解析
  • 基于Java实现缓存Cache的深入分析
  • C++实现strcmp字符串比较的深入探讨
  • 深入理解goto语句的替代实现方式分析
  • 深入Ajax代理的Java Servlet的实现详解
  • 深入理解memmove()与memcpy()的区别以及实现方法
  • 深入Resource实现多语言支持的应用详解
  • 快速排序的深入详解以及java实现
  • C++中const的实现机制深入分析
  • 深入全排列算法及其实现方法
  • 深入C++实现函数itoa()的分析
  • 深入理解C#实现快捷键(系统热键)响应的方法
  • 深入Java Robot实现控制鼠标和键盘的方法详解
  • Assert(断言实现机制深入剖析)
  • Oracle数据块实现原理深入解读
  • Android 静默方式实现批量安装卸载应用程序的深入分析
  • 深入理解TextView实现Rich Text--在同一个TextView设置不同字体风格
  • 深入分析:用1K内存实现高效I/O的RandomAccessFile类的详解
  • 关于《深入浅出MFC》
  • Linux有没有什么好的高级的书,我要深入,
  • 深入理解linux内核
  • [100分]有没有关于binutils的深入的资料?或者深入底层的资料?
  • 深入理解PHP内核 TIPI
  • 想深入学习Java应该学习哪些东西
  • 哪位有《JSP深入编程》电子版?
  • 想要深入学习LINUX该学什么?
  • 100分求:哪儿有《深入理解linux内核》可供下哉!
  • 如何深入Linux的内核学习?
  • U-BOOT得掌握到什么程序,用不用深入去学
  • 想深入了解操作系统该怎么做


  • 站内导航:


    特别声明:169IT网站部分信息来自互联网,如果侵犯您的权利,请及时告知,本站将立即删除!

    ©2012-2021,,E-mail:www_#163.com(请将#改为@)

    浙ICP备11055608号-3