


What are atomic operations? An in-depth analysis of atomic operations in go
Mar 28, 2023 pm 07:04 PMIn some of our previous articles related to the sync
package, we should have also discovered that atomic operations are used in many places.
For example, sync.WaitGroup
, sync.Map
, and then sync.Pool
, these structures all have atomic operations in their implementation.
Atomic operations are a very important operation in concurrent programming. They can ensure concurrency safety and are very efficient.
This article will deeply explore the principles, usage scenarios, usage, etc. of atomic operations in go.
What are atomic operations?
Atomic operations are variable-level mutex locks.
If I were to explain in one sentence what an atomic operation is, it would be: Atomic operations are variable-level mutex locks. Simply put, at the same time, only one CPU can read or write variables.
When we want to make concurrent and safe modifications to a certain variable, in addition to using the officially provided Mutex
, we can also use the atomic operations of the sync/atomic
package.
It can ensure that the reading or modification of variables will not be affected by other coroutines.
We can use the following figure to represent it:
#Explanation: In the above figure, we have three CPU logical cores, among which CPU 1 is processing variablesv
performs atomic operations. At this time, CPU 2 and CPU 3 cannot perform any operations on v
.
After the CPU 1 operation is completed, CPU 2 and CPU 3 can obtain the latest value of v
.
From this perspective, we can regard the atomic operations in the
sync/atomic
package as variable-level mutex locks. That is to say, in go, when a coroutine performs an atomic operation on a variable, other coroutines cannot perform any operations on the variable until the coroutine operation is completed.
What are the usage scenarios of atomic operations?
Take a simple example to illustrate the usage scenario of atomic operations:
func TestAtomic(t *testing.T) { var sum = 0 var wg sync.WaitGroup wg.Add(1000) // 啟動(dòng) 1000 個(gè)協(xié)程,每個(gè)協(xié)程對(duì) sum 做加法操作 for i := 0; i < 1000; i++ { go func() { defer wg.Done() sum++ }() } // 等待所有的協(xié)程都執(zhí)行完畢 wg.Wait() fmt.Println(sum) // 這里輸出多少呢? }
We can run this code on our own computer to see what the output result is. . If nothing else, it should be different every time, and it shouldn’t be 1000. Why is this?
This is because when the CPU adds sum
, it needs to first read the current value of sum
into the CPU register, and then perform the addition. operation, and finally write it back to memory.
If two CPUs take the value of sum
at the same time, then both perform addition operations, and then both write them back to the memory, then the value of sum
will be overwritten. This results in incorrect results.
For example, the current sum
in the memory is 1, and then the two CPUs take this 1 for addition at the same time, and then both get the result 2,
Then the two CPUs write their respective calculation results back to the memory, so the sum
in the memory becomes 2 instead of 3.
In this scenario, we can use atomic operations to implement concurrent and safe addition operations:
func TestAtomic1(t *testing.T) { // 將 sum 的類型改成 int32,因?yàn)樵硬僮髦荒茚槍?duì) int32、int64、uint32、uint64、uintptr 這幾種類型 var sum int32 = 0 var wg sync.WaitGroup wg.Add(1000) // 啟動(dòng) 1000 個(gè)協(xié)程,每個(gè)協(xié)程對(duì) sum 做加法操作 for i := 0; i < 1000; i++ { go func() { defer wg.Done() // 將 sum++ 改成下面這樣 atomic.AddInt32(&sum, 1) }() } wg.Wait() fmt.Println(sum) // 輸出 1000 }
In the above example, we can get the result of 1000 every time we execute it.
Because when using atomic operations, only one CPU can read or write variables at the same time, so the above problem will not occur.
So in many places where concurrent reading and writing of variables is required, we can consider whether atomic operations can be used to achieve concurrent and safe operations (instead of using mutex locks, mutual exclusion locks, etc.) The lock exclusion efficiency is lower than that of atomic operations).
The usage scenarios of atomic operations are similar to mutex locks, but the difference is that our lock granularity is just a variable. In other words, when we do not allow multiple CPUs to read and write variables at the same time (to ensure that only one CPU can operate on variables at the same time), we can use atomic operations.
How are atomic operations implemented?
After reading the above introduction to atomic operations, do you think that atomic operations are amazing and that there is such a useful thing? So how is it achieved?
Generally, the implementation of atomic operations requires special CPU instructions or system calls. These instructions or system calls can ensure that they will not be interrupted by other operations or events during execution, thereby ensuring the atomicity of the operation.
例如,在 x86 架構(gòu)的 CPU 中,可以使用 LOCK
前綴來實(shí)現(xiàn)原子操作。LOCK
前綴可以與其他指令一起使用,用于鎖定內(nèi)存總線,防止其他 CPU 訪問同一內(nèi)存地址,從而實(shí)現(xiàn)原子操作。
在使用 LOCK
前綴的指令執(zhí)行期間,CPU 會(huì)將當(dāng)前處理器緩存中的數(shù)據(jù)寫回到內(nèi)存中,并鎖定該內(nèi)存地址,
防止其他 CPU 修改該地址的數(shù)據(jù)(所以原子操作總是可以讀取到最新的數(shù)據(jù))。
一旦當(dāng)前 CPU 對(duì)該地址的操作完成,CPU 會(huì)釋放該內(nèi)存地址的鎖定,其他 CPU 才能繼續(xù)對(duì)該地址進(jìn)行訪問。
x86 LOCK 的時(shí)候發(fā)生了什么
我們?cè)賮磙垡幌律厦娴膬?nèi)容,看看 LOCK
前綴是如何實(shí)現(xiàn)原子操作的:
- CPU 會(huì)將當(dāng)前處理器緩存中的數(shù)據(jù)寫回到內(nèi)存中。(因此我們總能讀取到最新的數(shù)據(jù))
- 然后鎖定該內(nèi)存地址,防止其他 CPU 修改該地址的數(shù)據(jù)。
- 一旦當(dāng)前 CPU 對(duì)該地址的操作完成,CPU 會(huì)釋放該內(nèi)存地址的鎖定,其他 CPU 才能繼續(xù)對(duì)該地址進(jìn)行訪問。
其他架構(gòu)的 CPU 可能會(huì)略有不同,但是原理是一樣的。
原子操作有什么特征?
- 不會(huì)被中斷:原子操作是一個(gè)不可分割的操作,要么全部執(zhí)行,要么全部不執(zhí)行,不會(huì)出現(xiàn)中間狀態(tài)。這是保證原子性的基本前提。同時(shí),原子操作過程中不會(huì)有上下文切換的過程。
- 操作對(duì)象是共享變量:原子操作通常是對(duì)共享變量進(jìn)行的,也就是說,多個(gè)協(xié)程可以同時(shí)訪問這個(gè)變量,因此需要采用原子操作來保證數(shù)據(jù)的一致性和正確性。
- 并發(fā)安全:原子操作是并發(fā)安全的,可以保證多個(gè)協(xié)程同時(shí)進(jìn)行操作時(shí)不會(huì)出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)問題(雖然說是同時(shí),但是實(shí)際上在操作那個(gè)變量的時(shí)候是互斥的)。
- 無需加鎖:原子操作不需要使用互斥鎖來保證數(shù)據(jù)的一致性和正確性,因此可以避免互斥鎖的使用帶來的性能損失。
- 適用場(chǎng)景比較局限:原子操作適用于操作單個(gè)變量,如果需要同時(shí)并發(fā)讀寫多個(gè)變量,可能需要考慮使用互斥鎖。
go 里面有哪些原子操作?
在 go 中,主要有以下幾種原子操作:Add
、CompareAndSwap
、Load
、Store
、Swap
。
增減(Add)
- 用于進(jìn)行增加或減少的原子操作,函數(shù)名以
Add
為前綴,后綴針對(duì)特定類型的名稱。 - 原子增被操作的類型只能是數(shù)值類型,即
int32
、int64
、uint32
、uint64
、uintptr
- 原子增減函數(shù)的第一個(gè)參數(shù)為原值,第二個(gè)參數(shù)是要增減多少。
- 方法:
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
int32
和int64
的第二個(gè)參數(shù)可以是負(fù)數(shù),這樣就可以做原子減法了。
比較并交換(CompareAndSwap)
也就是我們常見的 CAS
,在 CAS
操作中,會(huì)需要拿舊的值跟 old
比較,如果相等,就將 new
賦值給 addr
。
如果不相等,則不做任何操作。最后返回一個(gè) bool
值,表示是否成功 swap
。
也就是說,這個(gè)操作可能是不成功的。這很正常,在并發(fā)環(huán)境下,多個(gè)協(xié)程對(duì)同一個(gè)變量進(jìn)行操作,肯定會(huì)存在競(jìng)爭(zhēng)的情況。 在這種情況下,偶爾的失敗是正常的,我們只需要在失敗的時(shí)候,重新嘗試即可。 因?yàn)樵硬僮餍枰臅r(shí)間往往是比較短的,因此在失敗的時(shí)候,我們可以通過自旋的方式來再次進(jìn)行嘗試。
在這種情況下,如果不自旋,那就需要將這個(gè)協(xié)程掛起,等待其他協(xié)程完成操作,然后再次嘗試。這個(gè)過程相比自旋可能會(huì)更加耗時(shí)。 因?yàn)楹苡锌赡苓@次原子操作不成功,下一次就成功了。如果我們每次都將協(xié)程掛起,那么效率就會(huì)大大降低。
for
+ 原子操作的方式,在 go 的 sync
包中很多地方都有使用,比如 sync.Map
,sync.Pool
等。
這也是使用原子操作時(shí)一個(gè)非常常見的使用模式。
CompareAndSwap
的功能:
- 用于比較并交換的原子操作,函數(shù)名以
CompareAndSwap
為前綴,后綴針對(duì)特定類型的名稱。 - 原子比較并交換被操作的類型可以是數(shù)值類型或指針類型,即
int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
- 原子比較并交換函數(shù)的第一個(gè)參數(shù)為原值指針,第二個(gè)參數(shù)是要比較的值,第三個(gè)參數(shù)是要交換的值。
- 方法:
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
載入(Load)
原子性的讀取操作接受一個(gè)對(duì)應(yīng)類型的指針值,返回該指針指向的值。原子性讀取意味著讀取值的同時(shí),當(dāng)前計(jì)算機(jī)的任何 CPU 都不會(huì)進(jìn)行針對(duì)值的讀寫操作。
如果不使用原子 Load
,當(dāng)使用 v := value
這種賦值方式為變量 v
賦值時(shí),讀取到的 value
可能不是最新的,因?yàn)樵谧x取操作時(shí)其他協(xié)程對(duì)它的讀寫操作可能會(huì)同時(shí)發(fā)生。
Load 操作有下面這些:
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
存儲(chǔ)(Store)
Store
可以將 val
值保存到 *addr
中,Store
操作是原子性的,因此在執(zhí)行 Store
操作時(shí),當(dāng)前計(jì)算機(jī)的任何 CPU 都不會(huì)進(jìn)行針對(duì) *addr
的讀寫操作。
- 原子性存儲(chǔ)會(huì)將
val
值保存到*addr
中。 - 與讀操作對(duì)應(yīng)的寫入操作,
sync/atomic
提供了與原子值載入Load
函數(shù)相對(duì)應(yīng)的原子值存儲(chǔ)Store
函數(shù),原子性存儲(chǔ)函數(shù)均以Store
為前綴。
Store
操作有下面這些:
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintpre, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
交換(Swap)
Swap
跟 Store
有點(diǎn)類似,但是它會(huì)返回 *addr
的舊值。
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
原子操作任意類型的值 - atomic.Value
從上一節(jié)中,我們知道了在 go 中原子操作可以操作 int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
這些類型的值。
但是在實(shí)際開發(fā)中,我們的類型還有很多,比如 string
、struct
等等,那這些類型的值如何進(jìn)行原子操作呢?答案是使用 atomic.Value
。
atomic.Value
是一個(gè)結(jié)構(gòu)體,它的內(nèi)部有一個(gè) any
類型的字段,存儲(chǔ)了我們要原子操作的值,也就是一個(gè)任意類型的值。
atomic.Value
支持以下操作:
Load
:原子性的讀取Value
中的值。Store
:原子性的存儲(chǔ)一個(gè)值到Value
中。Swap
:原子性的交換Value
中的值,返回舊值。CompareAndSwap
:原子性的比較并交換Value
中的值,如果舊值和old
相等,則將new
存入Value
中,返回true
,否則返回false
。
atomic.Value
的這些操作跟上面講到的那些操作其實(shí)差不多,只不過 atomic.Value
可以操作任意類型的值。
那 atomic.Value
是如何實(shí)現(xiàn)的呢?
atomic.Value 源碼分析
atomic.Value
是一個(gè)結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體只有一個(gè)字段:
// Value 提供一致類型值的原子加載和存儲(chǔ)。 type Value struct { v any }
Load - 讀取
Load
返回由最近的 Store
設(shè)置的值。如果還沒有 Store
過任何值,則返回 nil
。
// Load 返回由最近的 Store 設(shè)置的值。 func (v *Value) Load() (val any) { // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // 判斷 atomic.Value 的類型 typ := LoadPointer(&vp.typ) // 第一次 Store 還沒有完成,直接返回 nil if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) { // firstStoreInProgress 是一個(gè)特殊的變量,存儲(chǔ)到 typ 中用來表示第一次 Store 還沒有完成 return nil } // 獲取 atomic.Value 的值 data := LoadPointer(&vp.data) // 將 val 轉(zhuǎn)換為 efaceWords 類型 vlp := (*efaceWords)(unsafe.Pointer(&val)) // 分別賦值給 val 的 typ 和 data vlp.typ = typ vlp.data = data return }
在 atomic.Value
的源碼中,我們都可以看到 efaceWords
的身影,它實(shí)際上代表的是 interface{}/any
類型:
// 表示一個(gè) interface{}/any 類型 type efaceWords struct { typ unsafe.Pointer data unsafe.Pointer }
看到這里我們會(huì)不會(huì)覺得很困惑,直接返回 val
不就可以了嗎?為什么要將 val
轉(zhuǎn)換為 efaceWords
類型呢?
這是因?yàn)?go 中的原子操作只能操作 int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
這些類型的值,
不支持 interface{}
類型,但是如果了解 interface{}
底層結(jié)構(gòu)的話,我們就知道 interface{}
底層其實(shí)就是一個(gè)結(jié)構(gòu)體,
它有兩個(gè)字段,一個(gè)是 type
,一個(gè)是 data
,type
用來存儲(chǔ) interface{}
的類型,data
用來存儲(chǔ) interface{}
的值。
而且這兩個(gè)字段都是 unsafe.Pointer
類型的,所以其實(shí)我們可以對(duì) interface{}
的 type
和 data
分別進(jìn)行原子操作,
這樣最終其實(shí)也可以達(dá)到了原子操作 interface{}
的目的了,是不是非常地巧妙呢?
Store - 存儲(chǔ)
Store
將 Value
的值設(shè)置為 val
。對(duì)給定值的所有存儲(chǔ)調(diào)用必須使用相同具體類型的值。不一致類型的存儲(chǔ)會(huì)發(fā)生恐慌,Store(nil)
也會(huì) panic
。
// Store 將 Value 的值設(shè)置為 val。 func (v *Value) Store(val any) { // 不能存儲(chǔ) nil 值 if val == nil { panic("sync/atomic: store of nil value into Value") } // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // val 轉(zhuǎn)換為 efaceWords vlp := (*efaceWords)(unsafe.Pointer(&val)) // 自旋進(jìn)行原子操作,這個(gè)過程不會(huì)很久,開銷相比互斥鎖小 for { // LoadPointer 可以保證獲取到的是最新的 typ := LoadPointer(&vp.typ) // 第一次 store 的時(shí)候 typ 還是 nil,說明是第一次 store if typ == nil { // 嘗試開始第一次 Store。 // 禁用搶占,以便其他 goroutines 可以自旋等待完成。 // (如果允許搶占,那么其他 goroutine 自旋等待的時(shí)間可能會(huì)比較長(zhǎng),因?yàn)榭赡軙?huì)需要進(jìn)行協(xié)程調(diào)度。) runtime_procPin() // 搶占失敗,意味著有其他 goroutine 成功 store 了,允許搶占,再次嘗試 Store // 這也是一個(gè)原子操作。 if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { runtime_procUnpin() continue } // 完成第一次 store // 因?yàn)橛?firstStoreInProgress 標(biāo)識(shí)的保護(hù),所以下面的兩個(gè)原子操作是安全的。 StorePointer(&vp.data, vlp.data) // 存儲(chǔ)值(原子操作) StorePointer(&vp.typ, vlp.typ) // 存儲(chǔ)類型(原子操作) runtime_procUnpin() // 允許搶占 return } // 另外一個(gè) goroutine 正在進(jìn)行第一次 Store。自旋等待。 if typ == unsafe.Pointer(&firstStoreInProgress) { continue } // 第一次 Store 已經(jīng)完成了,下面不是第一次 Store 了。 // 需要檢查當(dāng)前 Store 的類型跟第一次 Store 的類型是否一致,不一致就 panic。 if typ != vlp.typ { panic("sync/atomic: store of inconsistently typed value into Value") } // 后續(xù)的 Store 只需要 Store 值部分就可以了。 // 因?yàn)?atomic.Value 只能保存一種類型的值。 StorePointer(&vp.data, vlp.data) return } }
在 Store
中,有以下幾個(gè)注意的點(diǎn):
- 使用
firstStoreInProgress
來確保第一次Store
的時(shí)候,只有一個(gè)goroutine
可以進(jìn)行Store
操作,其他的goroutine
需要自旋等待。如果沒有這個(gè)保護(hù),那么存儲(chǔ)typ
和data
的時(shí)候就會(huì)出現(xiàn)競(jìng)爭(zhēng)(因?yàn)樾枰獌蓚€(gè)原子操作),導(dǎo)致數(shù)據(jù)不一致。在這里其實(shí)可以將firstStoreInProgress
看作是一個(gè)互斥鎖。 - 在進(jìn)行第一次
Store
的時(shí)候,會(huì)將當(dāng)前的 goroutine 和P
綁定,這樣拿到firstStoreInProgress
鎖的協(xié)程就可以盡快地完成第一次Store
操作,這樣一來,其他的協(xié)程也不用等待太久。 - 在第一次
Store
的時(shí)候,會(huì)有兩個(gè)原子操作,分別存儲(chǔ)類型和值,但是因?yàn)橛?firstStoreInProgress
的保護(hù),所以這兩個(gè)原子操作本質(zhì)上是對(duì)interface{}
的一個(gè)原子存儲(chǔ)操作。 - 其他協(xié)程在看到有
firstStoreInProgress
標(biāo)識(shí)的時(shí)候,就會(huì)自旋等待,直到第一次Store
完成。 - 在后續(xù)的
Store
操作中,只需要存儲(chǔ)值就可以了,因?yàn)?atomic.Value
只能保存一種類型的值。
Swap - 交換
Swap
將 Value
的值設(shè)置為 new
并返回舊值。對(duì)給定值的所有交換調(diào)用必須使用相同具體類型的值。同時(shí),不一致類型的交換會(huì)發(fā)生恐慌,Swap(nil)
也會(huì) panic
。
// Swap 將 Value 的值設(shè)置為 new 并返回舊值。 func (v *Value) Swap(new any) (old any) { // 不能存儲(chǔ) nil 值 if new == nil { panic("sync/atomic: swap of nil value into Value") } // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // new 轉(zhuǎn)換為 efaceWords np := (*efaceWords)(unsafe.Pointer(&new)) // 自旋進(jìn)行原子操作,這個(gè)過程不會(huì)很久,開銷相比互斥鎖小 for { // 下面這部分代碼跟 Store 一樣,不細(xì)說了。 // 這部分代碼是進(jìn)行第一次存儲(chǔ)的代碼。 typ := LoadPointer(&vp.typ) if typ == nil { runtime_procPin() if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { runtime_procUnpin() continue } StorePointer(&vp.data, np.data) StorePointer(&vp.typ, np.typ) runtime_procUnpin() return nil } if typ == unsafe.Pointer(&firstStoreInProgress) { continue } if typ != np.typ { panic("sync/atomic: swap of inconsistently typed value into Value") } // ---- 下面是 Swap 的特有邏輯 ---- // op 是返回值 op := (*efaceWords)(unsafe.Pointer(&old)) // 返回舊的值 op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data) return old } }
CompareAndSwap - 比較并交換
CompareAndSwap
將 Value
的值與 old
比較,如果相等則設(shè)置為 new
并返回 true
,否則返回 false
。
對(duì)給定值的所有比較和交換調(diào)用必須使用相同具體類型的值。同時(shí),不一致類型的比較和交換會(huì)發(fā)生恐慌,CompareAndSwap(nil, nil)
也會(huì) panic
。
// CompareAndSwap 比較并交換。 func (v *Value) CompareAndSwap(old, new any) (swapped bool) { // 注意:old 是可以為 nil 的,new 不能為 nil。 // old 是 nil 表示是第一次進(jìn)行 Store 操作。 if new == nil { panic("sync/atomic: compare and swap of nil value into Value") } // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // new 轉(zhuǎn)換為 efaceWords np := (*efaceWords)(unsafe.Pointer(&new)) // old 轉(zhuǎn)換為 efaceWords op := (*efaceWords)(unsafe.Pointer(&old)) // old 和 new 類型必須一致,且不能為 nil if op.typ != nil && np.typ != op.typ { panic("sync/atomic: compare and swap of inconsistently typed values") } // 自旋進(jìn)行原子操作,這個(gè)過程不會(huì)很久,開銷相比互斥鎖小 for { // LoadPointer 可以保證獲取到的 typ 是最新的 typ := LoadPointer(&vp.typ) if typ == nil { // atomic.Value 是 nil,還沒 Store 過 // 準(zhǔn)備進(jìn)行第一次 Store,但是傳遞進(jìn)來的 old 不是 nil,compare 這一步就失敗了。直接返回 false if old != nil { return false } // 下面這部分代碼跟 Store 一樣,不細(xì)說了。 // 這部分代碼是進(jìn)行第一次存儲(chǔ)的代碼。 runtime_procPin() if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { runtime_procUnpin() continue } StorePointer(&vp.data, np.data) StorePointer(&vp.typ, np.typ) runtime_procUnpin() return true } if typ == unsafe.Pointer(&firstStoreInProgress) { continue } if typ != np.typ { panic("sync/atomic: compare and swap of inconsistently typed value into Value") } // 通過運(yùn)行時(shí)相等性檢查比較舊版本和當(dāng)前版本。 // 這允許對(duì)值類型進(jìn)行比較,這是包函數(shù)所沒有的。 // 下面的 CompareAndSwapPointer 僅確保 vp.data 自 LoadPointer 以來沒有更改。 data := LoadPointer(&vp.data) var i any (*efaceWords)(unsafe.Pointer(&i)).typ = typ (*efaceWords)(unsafe.Pointer(&i)).data = data if i != old { // atomic.Value 跟 old 不相等 return false } // 只做 val 部分的 cas 操作 return CompareAndSwapPointer(&vp.data, data, np.data) } }
這里需要特別說明的只有最后那個(gè)比較相等的判斷,也就是 data := LoadPointer(&vp.data)
以及往后的幾行代碼。
在開發(fā) atomic.Value
第一版的時(shí)候,那個(gè)開發(fā)者其實(shí)是將這幾行寫成 CompareAndSwapPointer(&vp.data, old.data, np.data)
這種形式的。
但是在舊的寫法中,會(huì)存在一個(gè)問題,如果我們做 CAS
操作的時(shí)候,如果傳遞的參數(shù) old
是一個(gè)結(jié)構(gòu)體的值這種類型,那么這個(gè)結(jié)構(gòu)體的值是會(huì)被拷貝一份的,
同時(shí)再會(huì)被轉(zhuǎn)換為 interface{}/any
類型,這個(gè)過程中,其實(shí)參數(shù)的 old
的 data
部分指針指向的內(nèi)存跟 vp.data
指向的內(nèi)存是不一樣的。
這樣的話,CAS
操作就會(huì)失敗,這個(gè)時(shí)候就會(huì)返回 false
,但是我們本意是要比較它的值,出現(xiàn)這種結(jié)果顯然不是我們想要的。
將值作為
interface{}
參數(shù)使用的時(shí)候,會(huì)存在一個(gè)將值轉(zhuǎn)換為interface{}
的過程。具體我們可以看看interface{}
的實(shí)現(xiàn)原理。
所以,在上面的實(shí)現(xiàn)中,會(huì)將舊值的 typ
和 data
賦值給一個(gè) any
類型的變量,
然后使用 i != old
這種方式進(jìn)行判斷,這樣就可以實(shí)現(xiàn)在比較的時(shí)候,比較的是值,而不是由值轉(zhuǎn)換為 interface{}
后的指針。
其他原子類型
我們現(xiàn)在知道了,atomic.Value
可以對(duì)任意類型做原子操作。
而對(duì)于其他的原子類型,比如 int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
等,
其實(shí)在 go 中也提供了包裝的類型,讓我們可以以對(duì)象的方式來操作這些類型。
對(duì)應(yīng)的類型如下:
atomic.Bool
:這個(gè)比較特別,但底層實(shí)際上是一個(gè)uint32
類型的值。我們對(duì)atomic.Bool
做原子操作的時(shí)候,實(shí)際上是對(duì)uint32
做原子操作。atomic.Int32
:int32
類型的包裝類型atomic.Int64
:int64
類型的包裝類型atomic.Uint32
:uint32
類型的包裝類型atomic.Uint64
:uint64
類型的包裝類型atomic.Uintptr
:uintptr
類型的包裝類型atomic.Pointer
:unsafe.Pointer
類型的包裝類型
這幾種類型的實(shí)現(xiàn)的代碼基本一樣,除了類型不一樣,我們可以看看 atomic.Int32
的實(shí)現(xiàn):
// An Int32 is an atomic int32. The zero value is zero. type Int32 struct { _ noCopy v int32 } // Load atomically loads and returns the value stored in x. func (x *Int32) Load() int32 { return LoadInt32(&x.v) } // Store atomically stores val into x. func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) } // Swap atomically stores new into x and returns the previous value. func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) } // CompareAndSwap executes the compare-and-swap operation for x. func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) { return CompareAndSwapInt32(&x.v, old, new) }
可以看到,atomic.Int32
的實(shí)現(xiàn)都是基于 atomic
包中 int32
類型相關(guān)的原子操作函數(shù)來實(shí)現(xiàn)的。
原子操作與互斥鎖比較
那我們有了互斥鎖,為什么還要有原子操作呢?我們進(jìn)行比較一下就知道了:
原子操作 | 互斥鎖 | |
---|---|---|
保護(hù)的范圍 | 變量 | 代碼塊 |
保護(hù)的粒度 | 小 | 大 |
性能 | 高 | 低 |
如何實(shí)現(xiàn)的 | 硬件指令 | 軟件層面實(shí)現(xiàn),邏輯較多 |
如果我們只需要對(duì)某一個(gè)變量做并發(fā)讀寫,那么使用原子操作就可以了,因?yàn)樵硬僮鞯男阅鼙然コ怄i高很多。 但是如果我們需要對(duì)多個(gè)變量做并發(fā)讀寫,那么就需要用到互斥鎖了,這種場(chǎng)景往往是在一段代碼中對(duì)不同變量做讀寫。
性能比較
我們前面這個(gè)表格提到了原子操作與互斥鎖性能上有差異,我們寫幾行代碼來進(jìn)行比較一下:
// 系統(tǒng)信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz // 10.13 ns/op func BenchmarkMutex(b *testing.B) { var mu sync.Mutex for i := 0; i < b.N; i++ { mu.Lock() mu.Unlock() } } // 5.849 ns/op func BenchmarkAtomic(b *testing.B) { var sum atomic.Uint64 for i := 0; i < b.N; i++ { sum.Add(uint64(1)) } }
在對(duì) Mutex
的性能測(cè)試中,我只是寫了簡(jiǎn)單的 Lock()
和 UnLock()
操作,因?yàn)檫@種比較才算是對(duì) Mutex
本身的測(cè)試,而在 Atomic
的性能測(cè)試中,對(duì) sum
做原子累加的操作。最終結(jié)果是,使用 Atomic
的操作耗時(shí)大概比 Mutex
少了 40%
以上。
在實(shí)際開發(fā)中,
Mutex
保護(hù)的臨界區(qū)內(nèi)往往有更多操作,也就意味著Mutex
鎖需要耗費(fèi)更長(zhǎng)的時(shí)間才能釋放,也就是會(huì)需要耗費(fèi)比上面這個(gè)40%
還要多的時(shí)間另外一個(gè)協(xié)程才能獲取到Mutex
鎖。
go 的 sync 包中的原子操作
在文章的開頭,我們就說了,在 go 的 sync.Map
和 sync.Pool
中都有用到了原子操作,本節(jié)就來看一看這些操作。
sync.Map 中的原子操作
在 sync.Map
中使用到了一個(gè) entry
結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體中大部分操作都是原子操作,我們可以看看它下面這兩個(gè)方法的定義:
// 刪除 entry func (e *entry) delete() (value any, ok bool) { for { p := e.p.Load() // 已經(jīng)被刪除了,不需要再刪除 if p == nil || p == expunged { return nil, false } // 刪除成功 if e.p.CompareAndSwap(p, nil) { return *p, true } } } // 如果條目尚未刪除,trySwap 將交換一個(gè)值。 func (e *entry) trySwap(i *any) (*any, bool) { for { p := e.p.Load() // 已經(jīng)被刪除了 if p == expunged { return nil, false } // swap 成功 if e.p.CompareAndSwap(p, i) { return p, true } } }
我們可以看到一個(gè)非常典型的特征就是 for
+ CompareAndSwap
的組合,這個(gè)組合在 entry
中出現(xiàn)了很多次。
如果我們也需要對(duì)變量做并發(fā)讀寫,也可以嘗試一下這種 for + CompareAndSwap 的組合。
sync.WaitGroup 中的原子操作
在 sync.WaitGroup
中有一個(gè)類型為 atomic.Uint64
的 state
字段,這個(gè)變量是用來記錄 WaitGroup
的狀態(tài)的。
在實(shí)際使用中,它的高 32 位用來記錄 WaitGroup
的計(jì)數(shù)器,低 32 位用來記錄 WaitGroup
的 Waiter
的數(shù)量,也就是等待條件變量滿足的協(xié)程數(shù)量。
如果不使用一個(gè)變量來記錄這兩個(gè)值,那么我們就需要使用兩個(gè)變量來記錄,這樣就會(huì)導(dǎo)致我們需要對(duì)兩個(gè)變量做并發(fā)讀寫, 在這種情況下,我們就需要使用互斥鎖來保護(hù)這兩個(gè)變量,這樣就會(huì)導(dǎo)致性能的下降。
而使用一個(gè)變量來記錄這兩個(gè)值,我們就可以使用原子操作來保護(hù)這個(gè)變量,這樣就可以保證并發(fā)讀寫的安全性,同時(shí)也能得到更好的性能:
// WaitGroup 的 Add 函數(shù):高 32 位加上 delta state := wg.state.Add(uint64(delta) << 32) // WaitGroup 的 Wait 函數(shù):低 32 位加 1 // 等待者的數(shù)量加 1 wg.state.CompareAndSwap(state, state+1)
CAS 操作有失敗必然有成功
當(dāng)然這里是指指向同一行 CAS
代碼的時(shí)候(也就是有競(jìng)爭(zhēng)的時(shí)候),如果是指向不同行 CAS
代碼的時(shí)候,那么就不一定了。
比如下面這個(gè)例子,我們把前面計(jì)算 sum
的例子改一改,改成用 CAS
操作來完成:
func TestCas(t *testing.T) { var sum int32 = 0 var wg sync.WaitGroup wg.Add(1000) for i := 0; i < 1000; i++ { go func() { defer wg.Done() // 這一行是有可能會(huì)失敗的 atomic.CompareAndSwapInt32(&sum, sum, sum+1) }() } wg.Wait() fmt.Println(sum) // 不是 1000 }
在這個(gè)例子中,我們把 atomic.AddInt32(&sum, 1)
改成了 atomic.CompareAndSwapInt32(&sum, sum, sum+1)
,
這樣就會(huì)導(dǎo)致有可能會(huì)有多個(gè) goroutine 同時(shí)執(zhí)行到 atomic.CompareAndSwapInt32(&sum, sum, sum+1)
這一行代碼,
這樣肯定會(huì)有不同的 goroutine 同時(shí)拿到一個(gè)相同的 sum
的舊值,那么在這種情況下,就會(huì)導(dǎo)致 CAS
操作失敗。
也就是說,將 sum
替換為 sum + 1
的操作可能會(huì)失敗。
失敗意味著什么呢?意味著另外一個(gè)協(xié)程序先把 sum
的值加 1 了,這個(gè)時(shí)候其實(shí)我們不應(yīng)該在舊的 sum
上加 1 了,
而是應(yīng)該在最新的 sum
上加上 1,那我們應(yīng)該怎么做呢?我們可以在 CAS
操作失敗的時(shí)候,重新獲取 sum
的值,
然后再次嘗試 CAS
操作,直到成功為止:
func TestCas(t *testing.T) { var sum int32 = 0 var wg sync.WaitGroup wg.Add(1000) for i := 0; i < 1000; i++ { go func() { defer wg.Done() // cas 失敗的時(shí)候,重新獲取 sum 的值進(jìn)行計(jì)算。 // cas 成功則返回。 for { if atomic.CompareAndSwapInt32(&sum, sum, sum+1) { return } } }() } wg.Wait() fmt.Println(sum) }
總結(jié)
原子操作是并發(fā)編程中非常重要的一個(gè)概念,它可以保證并發(fā)讀寫的安全性,同時(shí)也能得到更好的性能。
最后,總結(jié)一下本文講到的內(nèi)容:
- Atomic operations are lower-level operations that protect a single variable, while mutex locks can protect a piece of code. Their usage scenarios are different.
- Atomic operations need to be implemented through CPU instructions, while mutex locks are implemented at the software level.
- The atomic operations in go include the following:
-
#Add
: Atomic increase and decrease -
CompareAndSwap
: Atomic comparison and exchange -
Load
: Atomic read -
Store
: Atomic write -
Swap
: Atomic swap
-
- All types in go can use atomic operations, but different types of atomic operations use different functions.
-
atomic.Value
can be used to atomically operate variables of any type. - Some underlying implementations in go also use atomic operations, such as:
-
sync.WaitGroup
: Use atomic operations to ensure the safety of concurrent reading and writing of counters and waiters. sex. -
sync.Map
:entry
Basically all operations in the structure have atomic operations.
-
- If the atomic operation fails, it must succeed (it is talking about the
CAS
operation in the same line). If theCAS
operation fails, then we can Re-obtain the old value and try theCAS
operation again until it succeeds.
In general, the atomic operation itself does not have too complicated logic. After we understand its principles, we can use it easily.
Recommended learning: Golang tutorial
The above is the detailed content of What are atomic operations? An in-depth analysis of atomic operations in go. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undress AI Tool
Undress images for free

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

In Go language, calling a structure method requires first defining the structure and the method that binds the receiver, and accessing it using a point number. After defining the structure Rectangle, the method can be declared through the value receiver or the pointer receiver; 1. Use the value receiver such as func(rRectangle)Area()int and directly call it through rect.Area(); 2. If you need to modify the structure, use the pointer receiver such as func(r*Rectangle)SetWidth(...), and Go will automatically handle the conversion of pointers and values; 3. When embedding the structure, the method of embedded structure will be improved, and it can be called directly through the outer structure; 4. Go does not need to force use getter/setter,

In Go, an interface is a type that defines behavior without specifying implementation. An interface consists of method signatures, and any type that implements these methods automatically satisfy the interface. For example, if you define a Speaker interface that contains the Speak() method, all types that implement the method can be considered Speaker. Interfaces are suitable for writing common functions, abstract implementation details, and using mock objects in testing. Defining an interface uses the interface keyword and lists method signatures, without explicitly declaring the type to implement the interface. Common use cases include logs, formatting, abstractions of different databases or services, and notification systems. For example, both Dog and Robot types can implement Speak methods and pass them to the same Anno

InGo,ifstatementsexecutecodebasedonconditions.1.Basicstructurerunsablockifaconditionistrue,e.g.,ifx>10{...}.2.Elseclausehandlesfalseconditions,e.g.,else{...}.3.Elseifchainsmultipleconditions,e.g.,elseifx==10{...}.4.Variableinitializationinsideif,l

Go's time package provides functions for processing time and duration, including obtaining the current time, formatting date, calculating time difference, processing time zone, scheduling and sleeping operations. To get the current time, use time.Now() to get the Time structure, and you can extract specific time information through Year(), Month(), Day() and other methods; use Format("2006-01-0215:04:05") to format the time string; when calculating the time difference, use Sub() or Since() to obtain the Duration object, and then convert it into the corresponding unit through Seconds(), Minutes(), and Hours();

Gohandlesconcurrencyusinggoroutinesandchannels.1.GoroutinesarelightweightfunctionsmanagedbytheGoruntime,enablingthousandstorunconcurrentlywithminimalresourceuse.2.Channelsprovidesafecommunicationbetweengoroutines,allowingvaluestobesentandreceivedinas

Use bit operators to operate specific bits of integers in Go language, suitable for processing flag bits, underlying data, or optimization operations. 1. Use & (bit-wise) to check whether a specific bit is set; 2. Use

The standard way to protect critical areas in Go is to use the Lock() and Unlock() methods of sync.Mutex. 1. Declare a mutex and use it with the data to be protected; 2. Call Lock() before entering the critical area to ensure that only one goroutine can access the shared resources; 3. Use deferUnlock() to ensure that the lock is always released to avoid deadlocks; 4. Try to shorten operations in the critical area to improve performance; 5. For scenarios where more reads and less writes, sync.RWMutex should be used, read operations through RLock()/RUnlock(), and write operations through Lock()/Unlock() to improve concurrency efficiency.

A switch statement in Go is a control flow tool that executes different code blocks based on the value of a variable or expression. 1. Switch executes corresponding logic by matching cases, and does not support the default fall-through; 2. The conditions can be omitted and Boolean expressions are used as case judgment; 3. A case can contain multiple values, separated by commas; 4. Support type judgment (typeswitch), which is used to dynamically check the underlying types of interface variables. This makes switch easier and more efficient than long chain if-else when dealing with multi-condition branches, value grouping and type checking.
