文章出處

Disruptor-NET算法(是一種無鎖算法)需要我們自己實現某一種特定的內存操作的語義以保證算法的正確性。這時我們就需要顯式的使用一些指令來控制內存操作指令的順序以及其可見性定義。這種指令稱為內存柵欄。

內存一致性模型需要在各種的程序與系統的各個層次上定義內存訪問的行為。在機器碼與的層次上,其定義將影響硬件的設計者以及機器碼開發人員;而在高級語言層次上,其定義將影響高級語言開發人員以及編譯器開發人員和硬件設計人員。即,內存操作的亂序在各個層次都是存在的。這里,所謂的程序的執行順序有三種:

(1)程序順序:指在特定CPU上運行的,執行內存操作的代碼的順序。這指的是編譯好的程序二進制鏡像中的指令的順序。編譯器并不一定嚴格按照程序的順序進行二進制代碼的編排。編譯器可以按照既定的規則,在執行代碼優化的時候打亂指令的執行順序,也就是上面說的程序順序。并且,編譯器可以根據程序的特定行為進行性能優化,這種優化可能改變算法的形式與算法的執行復雜度。(例如將switch轉化為表驅動序列)
(2)執行順序:指在CPU上執行的獨立的內存相關的代碼執行的順序。執行順序和程序順序可能不同,這種不同是編譯器和CPU優化造成的結果。CPU在執行期(Runtime)根據自己的內存模型(跟編譯器無關)打亂已經編譯好了的指令的順序,以達到程序的優化和最大限度的資源利用。

(3)感知順序:指特定的CPU感知到他自身的或者其他CPU對內存進行操作的順序。感知順序和執行順序可能還不一樣。這是由于緩存優化或者內存優化系統造成的。

而最終的共享內存模型的表現形式是由這三種“順序”共同決定的。即從源代碼到最終執行進行了至少三個層次上的代碼順序調整,分別是由編譯器和CPU完成的。我們上面提到,這種代碼執行順序的改變雖然在單線程程序中不會引發副作用,但是在多線程程序中,這種作用是不能夠被忽略的,甚至可能造成完全錯誤的結果。因此,在多線程程序中,我們有時需要人為的限制內存執行的順序。而這種限制是通過不同層次的內存柵欄完成的。

Thread.MemoryBarrier就是采用了CPU提供的某些特定的指令的內存柵欄,下面是msdn的解釋【http://msdn.microsoft.com/zh-cn/library/vstudio/system.threading.thread.memorybarrier(v=vs.100).aspx】:

Thread.MemoryBarrier: 按如下方式同步內存訪問:執行當前線程的處理器在對指令重新排序時,不能采用先執行 MemoryBarrier 調用之后的內存訪問,再執行 MemoryBarrier 調用之前的內存訪問的方式。

按照我個人的理解:就是寫完數據之后,調用MemoryBarrier,數據就會立即刷新,另外在讀取數據之前調用MemoryBarrier可以確保讀取的數據是最新的,并且處理器對MemoryBarrier的優化小心處理。

        int _answer;
        bool _complete;

        void A()
        {
            _answer = 123;
            Thread.MemoryBarrier(); //在寫完之后,創建內存柵欄
            _complete = true;
            Thread.MemoryBarrier();//在寫完之后,創建內存柵欄       
       }

        void B()
        {
            Thread.MemoryBarrier();//在讀取之前,創建內存柵欄
            if (_complete)
            {
                Thread.MemoryBarrier();//在讀取之前,創建內存柵欄
                Console.WriteLine(_answer);
            }
        }

Disruptor-NET正是通過Thread.MemoryBarrier 實現無鎖和線程安全的內存操作,看下面是他的Atomic的Volatile類對常用數據類型的封裝,volatile可以阻止代碼重排序,并且值被更新的時候,會導致緩存失效,強制回寫到主存中。

image

/// <summary>
        /// An integer value that may be updated atomically
        /// </summary>
        public struct Integer
        {
            private int _value;

            /// <summary>
            /// Create a new <see cref="Integer"/> with the given initial value.
            /// </summary>
            /// <param name="value">Initial value</param>
            public Integer(int value)
            {
                _value = value;
            }

            /// <summary>
            /// Read the value without applying any fence
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadUnfenced()
            {
                return _value;
            }

            /// <summary>
            /// Read the value applying acquire fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadAcquireFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// <summary>
            /// Read the value applying full fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadFullFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// <summary>
            /// Read the value applying a compiler only fence, no CPU fence is applied
            /// </summary>
            /// <returns>The current value</returns>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public int ReadCompilerOnlyFence()
            {
                return _value;
            }

            /// <summary>
            /// Write the value applying release fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteReleaseFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// <summary>
            /// Write the value applying full fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteFullFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// <summary>
            /// Write the value applying a compiler fence only, no CPU fence is applied
            /// </summary>
            /// <param name="newValue">The new value</param>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public void WriteCompilerOnlyFence(int newValue)
            {
                _value = newValue;
            }

            /// <summary>
            /// Write without applying any fence
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteUnfenced(int newValue)
            {
                _value = newValue;
            }

            /// <summary>
            /// Atomically set the value to the given updated value if the current value equals the comparand
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <param name="comparand">The comparand (expected value)</param>
            /// <returns></returns>
            public bool AtomicCompareExchange(int newValue, int comparand)
            {
                return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
            }

            /// <summary>
            /// Atomically set the value to the given updated value
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <returns>The original value</returns>
            public int AtomicExchange(int newValue)
            {
                return Interlocked.Exchange(ref _value, newValue);
            }

            /// <summary>
            /// Atomically add the given value to the current value and return the sum
            /// </summary>
            /// <param name="delta">The value to be added</param>
            /// <returns>The sum of the current value and the given value</returns>
            public int AtomicAddAndGet(int delta)
            {
                return Interlocked.Add(ref _value, delta);
            }

            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The incremented value.</returns>
            public int AtomicIncrementAndGet()
            {
                return Interlocked.Increment(ref _value);
            }

            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The decremented value.</returns>
            public int AtomicDecrementAndGet()
            {
                return Interlocked.Decrement(ref _value);
            }

            /// <summary>
            /// Returns the string representation of the current value.
            /// </summary>
            /// <returns>the string representation of the current value.</returns>
            public override string ToString()
            {
                var value = ReadFullFence();
                return value.ToString();
            }
        }
        /// <summary>
        /// An integer value that may be updated atomically and is guaranteed to live on its own cache line (to prevent false sharing)
        /// </summary>
        [StructLayout(LayoutKind.Explicit, Size = CacheLineSize * 2)]
        public struct PaddedInteger
        {
            [FieldOffset(CacheLineSize)]
            private int _value;
 
            /// <summary>
            /// Create a new <see cref="PaddedInteger"/> with the given initial value.
            /// </summary>
            /// <param name="value">Initial value</param>
            public PaddedInteger(int value)
            {
                _value = value;
            }
 
            /// <summary>
            /// Read the value without applying any fence
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadUnfenced()
            {
                return _value;
            }
 
            /// <summary>
            /// Read the value applying acquire fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadAcquireFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }
 
            /// <summary>
            /// Read the value applying full fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadFullFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }
 
            /// <summary>
            /// Read the value applying a compiler only fence, no CPU fence is applied
            /// </summary>
            /// <returns>The current value</returns>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public int ReadCompilerOnlyFence()
            {
                return _value;
            }
 
            /// <summary>
            /// Write the value applying release fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteReleaseFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }
 
            /// <summary>
            /// Write the value applying full fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteFullFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }
 
            /// <summary>
            /// Write the value applying a compiler fence only, no CPU fence is applied
            /// </summary>
            /// <param name="newValue">The new value</param>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public void WriteCompilerOnlyFence(int newValue)
            {
                _value = newValue;
            }
 
            /// <summary>
            /// Write without applying any fence
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteUnfenced(int newValue)
            {
                _value = newValue;
            }
 
            /// <summary>
            /// Atomically set the value to the given updated value if the current value equals the comparand
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <param name="comparand">The comparand (expected value)</param>
            /// <returns></returns>
            public bool AtomicCompareExchange(int newValue, int comparand)
            {
                return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
            }
 
            /// <summary>
            /// Atomically set the value to the given updated value
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <returns>The original value</returns>
            public int AtomicExchange(int newValue)
            {
                return Interlocked.Exchange(ref _value, newValue);
            }
 
            /// <summary>
            /// Atomically add the given value to the current value and return the sum
            /// </summary>
            /// <param name="delta">The value to be added</param>
            /// <returns>The sum of the current value and the given value</returns>
            public int AtomicAddAndGet(int delta)
            {
                return Interlocked.Add(ref _value, delta);
            }
 
            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The incremented value.</returns>
            public int AtomicIncrementAndGet()
            {
                return Interlocked.Increment(ref _value);
            }
 
            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The decremented value.</returns>
            public int AtomicDecrementAndGet()
            {
                return Interlocked.Decrement(ref _value);
            }
 
            /// <summary>
            /// Returns the string representation of the current value.
            /// </summary>
            /// <returns>the string representation of the current value.</returns>
            public override string ToString()
            {
                var value = ReadFullFence();
                return value.ToString();
            }
        }

PaddedInteger類,它使用了7個Integer,加上一個對象頭,剛好64個字節。

Disruptor-net  3.2.0

disruptor學習筆記

Disruptor原理剖析  

剖析Disruptor:為什么會這么快?(二)神奇的緩存行填充

深入淺出多線程系列之八:內存柵欄和volatile 關鍵字


文章列表




Avast logo

Avast 防毒軟體已檢查此封電子郵件的病毒。
www.avast.com


arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()