Синхронізація в Go: використання спільних даних
Привіт, мене звати Ярослав. Працюю в компанії Evrius , три роки розробляю на Go, а раніше писав на PHP.
Помітив, що коли на співбесіді з Go питають про синхронізацію, то переважно запитання звучить: «Як розпаралелити задачу?». Про це я писав раніше . Але на співбесіді питають про одне, а в проєкті — інше, там значно більше випадків, коли дані читаються з багатьох горутин, а оновлюють в одній. Тоді краще використовувати оптимальні структури sync.RWMutex та atomic.Value. Про це й буде стаття. Тут ви знайдете приклади коду, помилок, тести, бенчмарки.
Матеріал буде цікавий спеціалістам, які збираються перекваліфікуватись на Go або вже мають досвід з цієї мовою та хочуть краще структурувати свої знання.
Власне, я згадав про PHP, бо маю багато знайомих PHP-розробників і інколи відповідаю на запитання «Як перекваліфікуватись з PHP на Go». Також інколи чую, як PHP-розробникам зі знанням Go, рекрутери пропонують розглянути вакансії Golang Team Lead . Якщо вам буде цікаво почитати про те, як перекваліфікуватись з PHP на Go, дайте знати про це в коментарях чи напишіть мені в LinkedIn, щоб я розумів актуальність питання і мав мотивацію взятись за статтю з прикладами коду та порівнянням.
Безпечне читання даних без синхронізації
Коли в коді просто читаємо завчасно завантажені дані без оновлень з багатьох горутин, то синхронізація зайва. Штучний приклад, коли багато горутин читають одні дані:
package main
import (
"fmt"
)
func main() {
var values = map[byte]int{
'A': 1,
'B': 2,
'C': 3,
}
var keys = []byte{'A', 'B', 'C'}
// panic: too many concurrent operations on a single file or socket (max 1048575)
for try := 1048575; try > 0; try-- {
go func() {
for i, key := range keys {
// safe goroutine read data without any sync
var value = values[key]
fmt.Printf("index = %d, key = %c, value = %d
", i, key, value)
}
}()
}
}
Я запустив цей приклад з параметром race (для перевірки на data race): go run main.go -race.
Якщо ж у першій горутині будемо читати з мапи, а в другій — писати в мапу, то отримаємо помилку fatal error: concurrent map read and map write . Про цю помилку також написано в офіційному блозі Go maps in action :
Maps are not safe for concurrent use : it’s not defined what happens when you read and write to them simultaneously.Розглянемо її детальніше.
Fatal error: concurrent map read and map write
Якщо взяти попередній приклад і додати одну горутину, яка буде писати в мапу, то отримаємо помилку, навіть якщо будемо оновлювати вже наявні ключі.
// BAD CODE WITH ERROR EXAMPLE
package main
import (
"fmt"
)
func main() {
var values = map[byte]int{
'A': 1,
'B': 2,
'C': 3,
}
var keys = []byte{'A', 'B', 'C'}
go func() {
for {
for _, key := range keys {
// UNSAFE, fatal error: concurrent map read and map write
values[key] = values[key] + 1
}
}
}()
// panic: too many concurrent operations on a single file or socket (max 1048575)
for try := 1048575; try > 0; try-- {
go func() {
for i, key := range keys {
// UNSAFE, fatal error: concurrent map read and map write
var value = values[key]
fmt.Printf("index = %d, key = %c, value = %d
", i, key, value)
}
}()
}
}
go run main.go -race
Після запуску програма виведе в термінал очікувані повідомлення ~ index = 0, key = A, value = 850 N разів, а потім завершиться помилкою:
fatal error: concurrent map read and map write
Повна заміна мапи без синхронізації
Знову досліджуємо попередній приклад і замість оновлення мапи будемо робити її повну заміну. Перевіримо, чи це безпечно.
// BAD CODE WITH UNSAFE EXAMPLE
package main
import (
"fmt"
)
func main() {
var values = map[byte]int{
'A': 1,
'B': 2,
'C': 3,
}
var keys = []byte{'A', 'B', 'C'}
go func() {
for {
var replaceValues = make(map[byte]int, len(values))
for _, key := range keys {
replaceValues[key] = values[key] + 1
}
// UNSAFE replace on some GOARCH, for example arm(32)
values = replaceValues
}
}()
// panic: too many concurrent operations on a single file or socket (max 1048575)
for try := 1048575; try > 0; try-- {
go func() {
for i, key := range keys {
// UNSAFE read on some GOARCH, for example arm(32)
var value = values[key]
fmt.Printf("index = %d, key = %c, value = %d
", i, key, value)
}
}()
}
}
go run main.go -race
Після запуску жодних помилок у терміналі, отже, повна заміна мапи виконалась успішно на моєму комп’ютері.
go env
GOARCH="amd64" GOHOSTARCH="amd64"
На архітектурі amd64 — повна заміна мапи (вказівник розміром 8 байтів, або 64 біти) є атомарною операцією (безпечною), але на інших архітектурах, таких як 32-бітна arm , будуть помилки. Тому треба писати код, який буде безпечний і на інших архітектурах. Для цього потрібно правильно використовувати синхронізації: sync.Mutex, sync.RWMutex та atomic.Value, які гарантують потокобезпечність.
Універсальний sync.Mutex
Завдання sync.Mutex — надати ексклюзивний доступ до даних за допомогою двох методів Lock() та Unlock() . Візьмемо попередні приклади і зробимо їх потокобезпечними, використовуючи sync.Mutex. Приклад, в якому була помилка fatal error: concurrent map read and map write з sync.Mutex, буде саме таким:
package main
import (
"fmt"
"sync"
)
func main() {
var values = map[byte]int{
'A': 1,
'B': 2,
'C': 3,
}
var keys = []byte{'A', 'B', 'C'}
var mu = new(sync.Mutex)
go func() {
for {
for _, key := range keys {
mu.Lock()
values[key] = values[key] + 1
mu.Unlock()
}
}
}()
// panic: too many concurrent operations on a single file or socket (max 1048575)
for try := 1048575; try > 0; try-- {
go func() {
for i, key := range keys {
mu.Lock()
var value = values[key]
mu.Unlock()
fmt.Printf("index = %d, key = %c, value = %d
", i, key, value)
}
}()
}
}
go run main.go -race
Бачимо успішне завершення.
Але правильніше буде винести дані і мютекс в одну структуру, яка буде відповідати за доступ до даних. Об’єднати в одну структуру — це стандартне рішення, бо більше зрозуміло, які маніпуляції з даними відбуваються.
package main
import (
"fmt"
"sync"
)
type SyncCounter struct {
data map[byte]int
mu sync.Mutex
}
func NewSyncCounter(data map[byte]int) *SyncCounter {
return &SyncCounter{
data: data,
}
}
func (c *SyncCounter) Increment(key byte) {
c.mu.Lock()
c.data[key] += 1
c.mu.Unlock()
}
func (c *SyncCounter) Get(key byte) int {
c.mu.Lock()
var value = c.data[key]
c.mu.Unlock()
return value
}
func main() {
var values = NewSyncCounter(map[byte]int{
'A': 1,
'B': 2,
'C': 3,
})
var keys = []byte{'A', 'B', 'C'}
go func() {
for {
for _, key := range keys {
values.Increment(key)
}
}
}()
// panic: too many concurrent operations on a single file or socket (max 1048575)
for try := 1048575; try > 0; try-- {
go func() {
for i, key := range keys {
var value = values.Get(key)
fmt.Printf("index = %d, key = %c, value = %d
", i, key, value)
}
}()
}
}
На початку знайомства з Go універсального sync.Mutex та каналів вистачить для написання коду, але якщо захочете замінити на sync.RWMutex чи atomic.Value, то краще пишіть тести і запускайте з параметром race , щоб перевірити наявність помилок і можливе погіршення швидкодії після оптимізації.
Повна заміна даних з sync.Mutex, sync.RWMutex та atomic.Value
Далі зосередимось тільки на структурах з даними та потокобезпечним доступом. Коли повністю замінюємо дані, достатньо обгорнути мютексом тільки заміну і отримання вказівника:
import "sync"
type CityOnlineMutexMap struct {
data map[string]uint32
mu sync.Mutex
}
func NewCityOnlineMutexMap(data map[string]uint32) *CityOnlineMutexMap {
return &CityOnlineMutexMap{
data: data,
}
}
func (c *CityOnlineMutexMap) Update(data map[string]uint32) {
c.mu.Lock()
c.data = data
c.mu.Unlock()
}
func (c *CityOnlineMutexMap) Get(cityName string) uint32 {
c.mu.Lock()
var data = c.data
c.mu.Unlock()
return data[cityName]
}
У цьому прикладі пошук в мапі за ключем я виніс за мютекс, бо це безпечно. Коли дані частіше читаються, ніж пишуться, то можемо використати більш оптимальний для таких дій sync.RWMutex:
import "sync"
type CityOnlineRWMutexMap struct {
data map[string]uint32
mu sync.RWMutex
}
func NewCityOnlineRWMutexMap(data map[string]uint32) *CityOnlineRWMutexMap {
return &CityOnlineRWMutexMap{
data: data,
}
}
func (c *CityOnlineRWMutexMap) Update(data map[string]uint32) {
c.mu.Lock()
c.data = data
c.mu.Unlock()
}
func (c *CityOnlineRWMutexMap) Get(cityName string) uint32 {
c.mu.RLock()
var data = c.data
c.mu.RUnlock()
return data[cityName]
}
У цьому прикладі заміна буде відбуватись довше, бо під капотом RWMutex.Lock() викликається звичайний Mutex.Lock() та додаткові перевірки. Але читання через RWMutex.RLock швидше. Тести будуть далі.
Яка різниця між Mutex та RWMutex — одне зі стандартних питань на співбесіді. Якщо у вас є налаштований локально Go і IDE, то можете зайти в реалізацію RWMutex та почитати коментарі, які пояснюють, як працює RWMutex:
// RLock // A writer is pending, wait for it. // Lock // Announce to readers there is a pending writer. // Wait for active readers.А найефективніший варіант повної заміни даних через atomic.Value:
import (
"sync/atomic"
)
type CityOnlineAtomicMap struct {
data atomic.Value
}
func NewCityOnlineAtomicMap(data map[string]uint32) *CityOnlineAtomicMap {
var result = new(CityOnlineAtomicMap)
result.Update(data)
return result
}
func (c *CityOnlineAtomicMap) Update(data map[string]uint32) {
c.data.Store(data)
}
func (c *CityOnlineAtomicMap) Get(cityName string) uint32 {
var data = c.data.Load().(map[string]uint32)
return data[cityName]
}
А тепер напишемо тест, щоб порівняти, яка структура найкраща для повної заміни даних:
package main
import (
"sync"
"testing"
"time"
)
type cityOnlineMap interface {
Update(data map[string]uint32)
Get(cityName string) uint32
}
func BenchmarkCityOnlineMutexMap(b *testing.B) {
benchmarkCityOnlineMap(b, NewCityOnlineMutexMap(getCityOnlineMap()))
}
func BenchmarkCityOnlineRWMutexMap(b *testing.B) {
benchmarkCityOnlineMap(b, NewCityOnlineRWMutexMap(getCityOnlineMap()))
}
func BenchmarkCityOnlineAtomicMap(b *testing.B) {
benchmarkCityOnlineMap(b, NewCityOnlineAtomicMap(getCityOnlineMap()))
}
func benchmarkCityOnlineMap(b *testing.B, data cityOnlineMap) {
b.Helper()
var once = new(sync.Once)
var cities = []string{"kyiv", "kharkiv", "lviv", "dnipro", "odessa"}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var isWriter = false
once.Do(func() {
isWriter = true
})
if isWriter {
for pb.Next() {
data.Update(getCityOnlineMap())
// read much more often than it is written
time.Sleep(time.Microsecond)
}
} else {
for pb.Next() {
for _, cityName := range cities {
_ = data.Get(cityName)
}
}
}
})
}
func getCityOnlineMap() map[string]uint32 {
var now = uint32(time.Now().Unix())
return map[string]uint32{
"kyiv": now,
"kharkiv": now,
"lviv": now,
"dnipro": now,
"odessa": now,
}
}
go test ./... -v -bench=. -benchmem
| Назва тесту | Середній час ітерації | Виділення пам’яті |
| BenchmarkCityOnlineMutexMap | 410 ns/op | 4 B/op 0 allocs/op |
| BenchmarkCityOnlineRWMutexMap | 241 ns/op | 0 B/op 0 allocs/op |
| BenchmarkCityOnlineAtomicMap | 10.4 ns/op | 0 B/op 0 allocs/op |
Якщо в задачі повна заміна даних, то краще використовувати atomic.Value як найефективнішу структуру (для цілочисельних даних є atomic.StoreUint64, atomic.StoreUint32, atomic.StoreInt64 та atomic.StoreInt32).
Екзотичні варіанти синхронізації через канали
Перший екзотичний варіант, який бачив у реальному проєкті:
// BAD CODE EXAMPLE, DON'T COPY-PASTE
type CityOnlineChanMutexMap struct {
data map[string]uint32
chanMutex chan struct{}
}
func NewCityOnlineChanMutexMap(data map[string]uint32) *CityOnlineChanMutexMap {
return &CityOnlineChanMutexMap{
data: data,
chanMutex: make(chan struct{}, 1),
}
}
func (c *CityOnlineChanMutexMap) Update(data map[string]uint32) {
c.chanMutex <- struct{}{}
c.data = data
_ = <-c.chanMutex
}
func (c *CityOnlineChanMutexMap) Get(cityName string) uint32 {
c.chanMutex <- struct{}{}
var result = c.data[cityName]
_ = <-c.chanMutex
return result
}
Під капотом каналів мютекси, і варіант вище поки найповільніший. У проєкті переписав його на sync.Mutex.
Другий екзотичний варіант помітив на просторах інтернету:
// BAD CODE EXAMPLE, DON'T COPY-PASTE
type cityOnlineRequest struct {
cityName string
online chan uint32
}
type CityOnlineChanReactorMap struct {
data map[string]uint32
requestChan chan cityOnlineRequest
dataChan chan map[string]uint32
}
func NewCityOnlineChanReactorMap(data map[string]uint32) *CityOnlineChanReactorMap {
var result = &CityOnlineChanReactorMap{
data: data,
requestChan: make(chan cityOnlineRequest),
dataChan: make(chan map[string]uint32),
}
go result.run()
return result
}
func (c *CityOnlineChanReactorMap) Update(data map[string]uint32) {
c.dataChan <- data
}
func (c *CityOnlineChanReactorMap) Get(cityName string) uint32 {
var request = cityOnlineRequest{
cityName: cityName,
online: make(chan uint32),
}
c.requestChan <- request
return <-request.online
}
func (c *CityOnlineChanReactorMap) run() {
for {
select {
case request := <-c.requestChan:
request.online <- c.data[request.cityName]
case data := <-c.dataChan:
c.data = data
}
}
}
Цей варіант ще повільніший.
| Назва тесту | Середній час ітерації | Виділення пам’яті |
| BenchmarkCityOnlineMutexMap | 410 ns/op | 4 B/op 0 allocs/op |
| BenchmarkCityOnlineRWMutexMap | 241 ns/op | 0 B/op 0 allocs/op |
| BenchmarkCityOnlineAtomicMap | 10.4 ns/op | 0 B/op 0 allocs/op |
| BenchmarkCityOnlineChanMutexMap | 2037 ns/op | 60 B/op 0 allocs/op |
| BenchmarkCityOnlineChanReactorMap | 3740 ns/op | 467 B/op 4 allocs/op |
Якщо на вашому проєкті є щось схоже, можете відправити мені анонімно — і я додам в коментарях.
Повернення гетера
У попередніх прикладах розглядали взаємодію з даними всередині структури. Але якщо нам знадобиться отримати значення одразу для багатьох ключів, то робити синхронізацію на кожен ключ буде повільно. Якщо ж повернемо всю мапу, буде складніше розуміти, що далі відбувається з даними. Тож повернемо інтерфейс CityOnlineGetter :
import (
"sync/atomic"
)
type CityOnlineGetter interface {
Get(cityName string) uint32
}
type CityOnlineMap struct {
data map[string]uint32
}
func (c *CityOnlineMap) Get(cityName string) uint32 {
return c.data[cityName]
}
type CityOnlineAtomicMap struct {
data atomic.Value
}
func NewCityOnlineAtomicMap(data map[string]uint32) *CityOnlineAtomicMap {
var result = new(CityOnlineAtomicMap)
result.Update(data)
return result
}
func (c *CityOnlineAtomicMap) Update(data map[string]uint32) {
c.data.Store(data)
}
func (c *CityOnlineAtomicMap) Get(cityName string) uint32 {
var data = c.data.Load().(map[string]uint32)
return data[cityName]
}
// BAD CODE
//func (c *CityOnlineAtomicMap) Load() map[string]uint32 {
// var data = c.data.Load().(map[string]uint32)
//
// return data
//}
func (c *CityOnlineAtomicMap) Load() CityOnlineGetter {
var data = c.data.Load().(map[string]uint32)
return &CityOnlineMap{
data: data,
}
}
Помилки, які можуть трапитися під час оновлення даних
Цю помилку, коли мютекс довго блокує виконання, бачив у реальних проєктах. Правильні приклади без помилок вже наведені у цій статті вище.
// BAD CODE EXAMPLE, DON'T COPY-PASTE
import (
"sync"
"time"
)
type CityOnline struct {
CityName string
Online uint32
}
type CityOnlineTooLongUpdateMap struct {
data map[string]uint32
mu sync.RWMutex
}
func NewCityOnlineTooLongUpdateMap(data map[string]uint32) *CityOnlineTooLongUpdateMap {
return &CityOnlineTooLongUpdateMap{
data: data,
}
}
func (c *CityOnlineTooLongUpdateMap) UpdateBySlice(items []CityOnline) {
c.mu.Lock()
defer c.mu.Unlock()
// other logic
// for example API call, emulate by time.Sleep
time.Sleep(time.Second)
c.data = make(map[string]uint32, len(items))
for _, item := range items {
c.data[item.CityName] = item.Online
}
}
func (c *CityOnlineTooLongUpdateMap) Get(cityName string) uint32 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[cityName]
}
У такому прикладі RLock буде чекати defer c.mu.Unlock() , відповідно читання буде заблоковано на секунду. Те саме стосується звичайного Mutex-а .
atomic.Value та збереження інших типів
Коли в статті розглядаєш тільки мапи, то виникає відчуття, що тільки з мапами atomic.Value і використовують. Що ж, для слайсів і звичайних структур також підходить:
import "sync/atomic"
type CountrySettings struct {
BlockCIDRs []string `json:"block_cidrs"`
}
type Settings struct {
Countries map[string]CountrySettings `json:"countries"`
PackSize uint32 `json:"pack_size"`
Interval uint32 `json:"interval"`
Debug bool `json:"debug"`
}
type SettingsProxy struct {
data atomic.Value
}
func NewSettingsProxy() *SettingsProxy {
return &SettingsProxy{}
}
func (s *SettingsProxy) Update(value Settings) {
s.data.Store(value)
}
func (s *SettingsProxy) Load() (Settings, bool) {
var result, ok = s.data.Load().(Settings)
return result, ok
}
import (
"github.com/stretchr/testify/require"
"testing"
)
func TestSettingsProxy(t *testing.T) {
var proxy = NewSettingsProxy()
{
var settings, ok = proxy.Load()
require.Equal(t, Settings{}, settings)
require.Equal(t, false, ok)
}
{
var expected = Settings{
Countries: map[string]CountrySettings{
"UA": {
BlockCIDRs: nil,
},
},
PackSize: 20000,
Interval: 60,
Debug: true,
}
proxy.Update(expected)
var actual, ok = proxy.Load()
require.Equal(t, expected, actual)
require.Equal(t, true, ok)
}
}
У попередніх прикладах в конструкторі я робив збереження значення в atomic.Value, а тут навмисно пропустив, щоб вказати потребу перевірки при приведенні типу:
func (s *SettingsProxy) Load() (Settings, bool) {
var result, ok = s.data.Load().(Settings)
return result, ok
}
Бо інакше буде паніка:
panic: interface conversion: interface {} is nil
Такий варіант теж допустимий:
func (s *SettingsProxy) Update(value *Settings) {
s.data.Store(value)
}
func (s *SettingsProxy) Load() (*Settings, bool) {
var result, ok = s.data.Load().(*Settings)
return result, ok
}
Застереження atomic.Value
Під час спроби зберегти nil чи різні типи отримуємо паніку. Ось тести, які показують таку поведінку:
func TestAtomicSuccessStoreNil(t *testing.T) {
var atomicValue = new(atomic.Value)
var value map[uint32]uint32 = nil
atomicValue.Store(value)
}
func TestAtomicPanicOnStoreNil(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
var atomicValue = new(atomic.Value)
// will panic
atomicValue.Store(nil)
}
func TestAtomicPanicOnStoreNilInterface(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
var atomicValue = new(atomic.Value)
var value interface{} = nil
// will panic
atomicValue.Store(value)
}
func TestAtomicPanicOnStoreDifferentTypes(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
var atomicValue = new(atomic.Value)
{
var value uint32
// will success
atomicValue.Store(value)
}
{
var value uint32 = 1
// will success
atomicValue.Store(value)
}
{
var value = ""
// will panic
atomicValue.Store(value)
}
}
Ці застереження написані в коментарях до коду atomic.Value .
atomic.Value та збереження int32, int64, uint32, uint64
Для числових типів можна використовувати й atomic.Value:
import "sync/atomic"
type AtomicValueUint64 struct {
data atomic.Value
}
func NewAtomicValueUint64(data uint64) *AtomicValueUint64 {
var result = new(AtomicValueUint64)
result.Update(data)
return result
}
func (c *AtomicValueUint64) Update(data uint64) {
c.data.Store(data)
}
func (c *AtomicValueUint64) Load() uint64 {
return c.data.Load().(uint64)
}
Але можна простіше:
import "sync/atomic"
type AtomicUint64 struct {
data uint64
}
func NewAtomicUint64(value uint64) *AtomicUint64 {
var result = new(AtomicUint64)
result.Update(value)
return result
}
func (c *AtomicUint64) Update(value uint64) {
atomic.StoreUint64(&c.data, value)
}
func (c *AtomicUint64) Load() uint64 {
return atomic.LoadUint64(&c.data)
}
Є спеціальні функції в пакеті sync/atomic :
func LoadInt32(addr *int32) (val int32) {}
func LoadInt64(addr *int64) (val int64) {}
func LoadUint32(addr *uint32) (val uint32) {}
func LoadUint64(addr *uint64) (val uint64) {}
func StoreInt32(addr *int32, val int32) {}
func StoreInt64(addr *int64, val int64) {}
func StoreUint32(addr *uint32, val uint32) {}
func StoreUint64(addr *uint64, val uint64) {}
Епілог
У статті є повно прикладів, які трохи відрізняються, щоб краще запам’ятати. Бо коли код простий, то сам приклад зрозуміліший за текстовий опис.
Схожі оптимізації з atomic.Value для повної заміни даних знадобляться під час написання бібліотек. У робочому закритому проєкті краще використовуйте RWMutex, бо конвертація типів — це джерело помилок.
А ще під кожний тип даних треба писати окрему обгортку з RWMutex, як було у прикладах, бо в Go відсутні дженерики.
Опубліковано: 18/12/20 @ 01:00
Розділ Різне
Рекомендуємо:
PHP: Настраиваем отладку. PhpStorm + PHP 8 + Docker + Xdebug 3
От шока до принятия: пять стадий тестирования API
У що інвестують ІТ-спеціалісти і як це працює: нерухомість, бізнес, ОВДП, індексні фонди
PHP: как удалить элемент массива по значению
Как я работаю: Александр Гончар, Chief AI Officer в Neurons Lab