Use concurrent sync.Map (new in Go 1.9)

This commit is contained in:
Trevor Slocum 2017-09-12 22:51:51 -07:00
parent 66b6c5630c
commit e06d24a1a0
10 changed files with 143 additions and 1259 deletions

8
Gopkg.lock generated
View File

@ -7,12 +7,6 @@
revision = "b26d9c308763d68093482582cea63d69be07a0f0"
version = "v0.3.0"
[[projects]]
branch = "master"
name = "github.com/orcaman/concurrent-map"
packages = ["."]
revision = "2ae17bc4c860c83513ee50feb9746f3e50d7515d"
[[projects]]
branch = "v2"
name = "gopkg.in/sorcix/irc.v2"
@ -22,6 +16,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "204144074458006bd79f81651581e678f9dfc09468db8c9faa41833302e67213"
inputs-digest = "0be9e577dd19c1613689669c289fb1cd6f32b0a47db9d3448a25d405cc1d456d"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -1,12 +1,23 @@
package main
import "github.com/orcaman/concurrent-map"
import (
"sync"
)
type Channel struct {
Entity
clients cmap.ConcurrentMap
clients *sync.Map
topic string
topictime int64
}
func NewChannel(identifier string) *Channel {
c := &Channel{}
c.Initialize(ENTITY_CHANNEL, identifier)
c.clients = new(sync.Map)
return c
}

View File

@ -23,6 +23,20 @@ type Client struct {
capHostInNames bool
}
func NewClient(identifier string, conn net.Conn, ssl bool) *Client {
c := &Client{}
c.Initialize(ENTITY_CLIENT, identifier)
c.ssl = ssl
c.nick = "*"
c.conn = conn
c.writebuffer = make(chan *irc.Message, writebuffersize)
c.reader = irc.NewDecoder(conn)
c.writer = irc.NewEncoder(conn)
return c
}
func (c *Client) getPrefix() *irc.Prefix {
return &irc.Prefix{Name: c.nick, User: c.user, Host: c.host}
}

View File

@ -4,8 +4,7 @@ import (
"fmt"
"strings"
"sync"
"github.com/orcaman/concurrent-map"
"time"
)
const ENTITY_CLIENT = 0
@ -23,21 +22,32 @@ type Entity struct {
identifier string
created int64
state int
modes cmap.ConcurrentMap
modes *sync.Map
}
*sync.RWMutex
func (e *Entity) Initialize(etype int, identifier string) {
e.identifier = identifier
e.entitytype = etype
e.created = time.Now().Unix()
e.state = ENTITY_STATE_NORMAL
e.modes = new(sync.Map)
}
func (e *Entity) getModes() map[string]string {
modes := make(map[string]string)
for ms := range e.modes.IterBuffered() {
modes[ms.Key] = ms.Val.(string)
}
e.modes.Range(func(k, v interface{}) bool {
modes[k.(string)] = v.(string)
return true
})
return modes
}
func (e *Entity) hasMode(mode string) bool {
return e.modes.Has(mode)
_, ok := e.modes.Load(mode)
return ok
}
func (e *Entity) addMode(mode string, value string) {
@ -49,7 +59,7 @@ func (e *Entity) addMode(mode string, value string) {
}
if strings.Index(allowedmodes, mode) != -1 && !e.hasMode(mode) {
e.modes.Set(mode, value)
e.modes.Store(mode, value)
}
}
@ -61,7 +71,7 @@ func (e *Entity) addModes(modes string) {
func (e *Entity) removeMode(mode string) {
if e.hasMode(mode) {
e.modes.Remove(mode)
e.modes.Delete(mode)
}
}
@ -74,17 +84,18 @@ func (e *Entity) removeModes(modes string) {
func (e *Entity) diffModes(lastmodes map[string]string) (map[string]string, map[string]string) {
addedmodes := make(map[string]string)
if lastmodes != nil {
for m := range e.modes.IterBuffered() {
if _, ok := lastmodes[m.Key]; !ok {
addedmodes[m.Key] = lastmodes[m.Key]
e.modes.Range(func(k, v interface{}) bool {
if _, ok := lastmodes[k.(string)]; !ok {
addedmodes[k.(string)] = lastmodes[k.(string)]
}
}
return true
})
}
removedmodes := make(map[string]string)
for mode := range lastmodes {
if e.hasMode(mode) {
m, _ := e.modes.Get(mode)
if m, ok := e.modes.Load(mode); ok {
removedmodes[mode] = m.(string)
}
}

155
server.go
View File

@ -16,7 +16,6 @@ import (
"time"
"github.com/BurntSushi/toml"
cmap "github.com/orcaman/concurrent-map"
irc "gopkg.in/sorcix/irc.v2"
)
@ -29,8 +28,8 @@ type Config struct {
type Server struct {
config *Config
created int64
clients cmap.ConcurrentMap
channels cmap.ConcurrentMap
clients *sync.Map
channels *sync.Map
odyssey *os.File
odysseymutex *sync.RWMutex
@ -42,9 +41,11 @@ type Server struct {
func NewServer() *Server {
s := &Server{}
s.config = &Config{}
s.created = time.Now().Unix()
s.clients = cmap.New()
s.channels = cmap.New()
s.clients = new(sync.Map)
s.channels = new(sync.Map)
s.odysseymutex = new(sync.RWMutex)
s.restartplain = make(chan bool, 1)
s.restartssl = make(chan bool, 1)
@ -62,7 +63,7 @@ func (s *Server) getAnonymousPrefix(i int) *irc.Prefix {
}
func (s *Server) getChannel(channel string) *Channel {
if ch, ok := s.channels.Get(channel); ok {
if ch, ok := s.channels.Load(channel); ok {
return ch.(*Channel)
}
@ -71,16 +72,21 @@ func (s *Server) getChannel(channel string) *Channel {
func (s *Server) getChannels(client string) map[string]*Channel {
channels := make(map[string]*Channel)
for chs := range s.channels.IterBuffered() {
if s.inChannel(chs.Key, client) {
channels[chs.Key] = chs.Val.(*Channel)
s.channels.Range(func(k, v interface{}) bool {
key := k.(string)
channel := v.(*Channel)
if s.inChannel(key, client) {
channels[key] = channel
}
}
return true
})
return channels
}
func (s *Server) getClient(client string) *Client {
if cl, ok := s.clients.Get(client); ok {
if cl, ok := s.clients.Load(client); ok {
return cl.(*Client)
}
@ -92,15 +98,20 @@ func (s *Server) getClients(channel string) map[string]*Client {
ch := s.getChannel(channel)
for cls := range ch.clients.IterBuffered() {
clients[cls.Key] = cls.Val.(*Client)
}
ch.clients.Range(func(k, v interface{}) bool {
cl := s.getClient(k.(string))
if cl != nil {
clients[cl.identifier] = cl
}
return true
})
return clients
}
func (s *Server) channelExists(channel string) bool {
if _, ok := s.channels.Get(channel); ok {
if _, ok := s.channels.Load(channel); ok {
return true
}
@ -110,7 +121,8 @@ func (s *Server) channelExists(channel string) bool {
func (s *Server) inChannel(channel string, client string) bool {
ch := s.getChannel(channel)
if ch != nil {
return ch.clients.Has(client)
_, ok := ch.clients.Load(client)
return ok
}
return false
@ -129,14 +141,14 @@ func (s *Server) joinChannel(channel string, client string) {
}
if ch == nil {
ch = &Channel{Entity{ENTITY_CHANNEL, channel, time.Now().Unix(), ENTITY_STATE_NORMAL, cmap.New(), new(sync.RWMutex)}, cmap.New(), "", 0}
s.channels.Set(channel, ch)
ch = NewChannel(channel)
s.channels.Store(channel, ch)
} else if ch.hasMode("z") && !cl.ssl {
cl.sendNotice("Unable to join " + channel + ": SSL connections only (channel mode +z)")
return
}
ch.clients.Set(client, s.getClientCount(channel, client)+1)
ch.clients.Store(client, s.getClientCount(channel, client)+1)
cl.write(&irc.Message{cl.getPrefix(), irc.JOIN, []string{channel}})
s.sendNames(channel, client)
@ -153,7 +165,7 @@ func (s *Server) partChannel(channel string, client string, reason string) {
}
cl.write(&irc.Message{cl.getPrefix(), irc.PART, []string{channel, reason}})
ch.clients.Remove(client)
ch.clients.Delete(client)
s.updateClientCount(channel, client)
}
@ -184,7 +196,11 @@ func (s *Server) getClientCount(channel string, client string) int {
return 0
}
ccount := ch.clients.Count()
ccount := 0
ch.clients.Range(func(k, v interface{}) bool {
ccount++
return true
})
if (ch.hasMode("c") || cl.hasMode("c")) && ccount >= 2 {
return 2
@ -200,38 +216,36 @@ func (s *Server) updateClientCount(channel string, client string) {
return
}
for cls := range ch.clients.IterBuffered() {
cclient := cls.Key
ccount := cls.Val.(int)
if client != "" && ch.hasMode("D") && cclient != client {
continue
}
cl := s.getClient(cclient)
ch.clients.Range(func(k, v interface{}) bool {
cl := s.getClient(k.(string))
ccount := v.(int)
if cl == nil {
continue
return true
} else if client != "" && ch.hasMode("D") && cl.identifier != client {
return true
}
chancount := s.getClientCount(channel, cclient)
chancount := s.getClientCount(channel, cl.identifier)
if ccount < chancount {
for i := ccount; i < chancount; i++ {
cl.write(&irc.Message{s.getAnonymousPrefix(i), irc.JOIN, []string{channel}})
}
ch.clients.Set(cclient, chancount)
ch.clients.Store(cl.identifier, chancount)
} else if ccount > chancount {
for i := ccount; i > chancount; i-- {
cl.write(&irc.Message{s.getAnonymousPrefix(i - 1), irc.PART, []string{channel}})
}
} else {
continue
return true
}
ch.clients.Set(cclient, chancount)
}
ch.clients.Store(cl.identifier, chancount)
return true
})
}
func (s *Server) sendNames(channel string, clientname string) {
@ -307,9 +321,10 @@ func (s *Server) handleTopic(channel string, client string, topic string) {
ch.topic = topic
ch.topictime = time.Now().Unix()
for cls := range ch.clients.IterBuffered() {
s.sendTopic(channel, cls.Key, true)
}
ch.clients.Range(func(k, v interface{}) bool {
s.sendTopic(channel, k.(string), true)
return true
})
} else {
s.sendTopic(channel, client, false)
}
@ -335,8 +350,8 @@ func (s *Server) handleMode(c *Client, params []string) {
c.write(&irc.Message{&anonirc, strings.Join([]string{"329", c.nick, params[0], fmt.Sprintf("%d", int32(ch.created))}, " "), []string{}})
} else if len(params) > 1 && len(params[1]) > 0 && (params[1][0] == '+' || params[1][0] == '-') {
lastmodes := make(map[string]string)
for ms := range ch.modes.IterBuffered() {
lastmodes[ms.Key] = ms.Val.(string)
for m, mv := range ch.getModes() {
lastmodes[m] = mv
}
if params[1][0] == '+' {
@ -346,7 +361,7 @@ func (s *Server) handleMode(c *Client, params []string) {
}
s.enforceModes(params[0])
if !reflect.DeepEqual(ch.modes.Items(), lastmodes) {
if !reflect.DeepEqual(ch.getModes(), lastmodes) {
// TODO: Check if local modes were set/unset, only send changes to local client
addedmodes, removedmodes := ch.diffModes(lastmodes)
@ -365,13 +380,14 @@ func (s *Server) handleMode(c *Client, params []string) {
addedmodes = c.getModes()
}
for cls := range ch.clients.IterBuffered() {
cl := s.getClient(cls.Key)
ch.clients.Range(func(k, v interface{}) bool {
cl := s.getClient(k.(string))
if cl != nil {
cl.write(&irc.Message{&anonymous, irc.MODE, []string{params[0], ch.printModes(addedmodes, removedmodes)}})
}
}
return true
})
if resendusercount {
s.updateClientCount(params[0], c.identifier)
@ -436,20 +452,21 @@ func (s *Server) handlePrivmsg(channel string, client string, message string) {
s.updateClientCount(channel, "")
for cls := range ch.clients.IterBuffered() {
ccl := s.getClient(cls.Key)
if ccl != nil && ccl.identifier != client {
ccl.write(&irc.Message{&anonymous, irc.PRIVMSG, []string{channel, message}})
ch.clients.Range(func(k, v interface{}) bool {
cl := s.getClient(k.(string))
if cl != nil && cl.identifier != client {
cl.write(&irc.Message{&anonymous, irc.PRIVMSG, []string{channel, message}})
}
}
return true
})
}
func (s *Server) handleRead(c *Client) {
for {
c.conn.SetDeadline(time.Now().Add(300 * time.Second))
if !s.clients.Has(c.identifier) {
if _, ok := s.clients.Load(c.identifier); !ok {
s.killClient(c)
return
}
@ -533,13 +550,16 @@ func (s *Server) handleRead(c *Client) {
}
} else if msg.Command == irc.LIST {
chans := make(map[string]int)
for chs := range s.channels.IterBuffered() {
ch := s.getChannel(chs.Key)
s.channels.Range(func(k, v interface{}) bool {
key := k.(string)
ch := v.(*Channel)
if ch != nil && !ch.hasMode("p") && !ch.hasMode("s") {
chans[chs.Key] = s.getClientCount(chs.Key, c.identifier)
chans[key] = s.getClientCount(key, c.identifier)
}
}
return true
})
c.write(&irc.Message{&anonirc, irc.RPL_LISTSTART, []string{"Channel", "Users Name"}})
for _, pl := range sortMapByValues(chans) {
@ -624,20 +644,20 @@ func (s *Server) handleConnection(conn net.Conn, ssl bool) {
for {
identifier = randomIdentifier()
if !s.clients.Has(identifier) {
if _, ok := s.clients.Load(identifier); !ok {
break
}
}
client := &Client{Entity{ENTITY_CLIENT, identifier, time.Now().Unix(), ENTITY_STATE_NORMAL, cmap.New(), new(sync.RWMutex)}, ssl, "*", "", "", conn, make(chan *irc.Message, writebuffersize), irc.NewDecoder(conn), irc.NewEncoder(conn), false}
s.clients.Set(client.identifier, client)
client := NewClient(identifier, conn, ssl)
s.clients.Store(client.identifier, client)
go s.handleWrite(client)
s.handleRead(client)
s.handleRead(client) // Block until the connection is closed
s.killClient(client)
close(client.writebuffer)
s.clients.Remove(identifier)
s.clients.Delete(identifier)
}
func (s *Server) killClient(c *Client) {
@ -648,7 +668,7 @@ func (s *Server) killClient(c *Client) {
c.write(nil)
c.conn.Close()
if s.clients.Has(c.identifier) {
if _, ok := s.clients.Load(c.identifier); ok {
s.partAllChannels(c.identifier)
}
}
@ -723,13 +743,14 @@ func (s *Server) listenSSL() {
func (s *Server) pingClients() {
for {
for cls := range s.clients.IterBuffered() {
cl := s.getClient(cls.Key)
s.clients.Range(func(k, v interface{}) bool {
cl := v.(*Client)
if cl != nil {
cl.write(&irc.Message{nil, irc.PING, []string{fmt.Sprintf("anonirc%d%d", int32(time.Now().Unix()), rand.Intn(1000))}})
}
}
return true
})
time.Sleep(90 * time.Second)
}
}

View File

@ -1,22 +0,0 @@
The MIT License (MIT)
Copyright (c) 2014 streamrail
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,60 +0,0 @@
# concurrent map [![Circle CI](https://circleci.com/gh/orcaman/concurrent-map.png?style=badge)](https://circleci.com/gh/orcaman/concurrent-map)
As explained [here](http://golang.org/doc/faq#atomic_maps) and [here](http://blog.golang.org/go-maps-in-action), the `map` type in Go doesn't support concurrent reads and writes. `concurrent-map` provides a high-performance solution to this by sharding the map with minimal time spent waiting for locks.
## usage
Import the package:
```go
import (
"github.com/orcaman/concurrent-map"
)
```
```bash
go get "github.com/orcaman/concurrent-map"
```
The package is now imported under the "cmap" namespace.
## example
```go
// Create a new map.
map := cmap.New()
// Sets item within map, sets "bar" under key "foo"
map.Set("foo", "bar")
// Retrieve item from map.
if tmp, ok := map.Get("foo"); ok {
bar := tmp.(string)
}
// Removes item under key "foo"
map.Remove("foo")
```
For more examples have a look at concurrent_map_test.go.
Running tests:
```bash
go test "github.com/orcaman/concurrent-map"
```
## guidelines for contributing
Contributions are highly welcome. In order for a contribution to be merged, please follow these guidelines:
- Open an issue and describe what you are after (fixing a bug, adding an enhancement, etc.).
- According to the core team's feedback on the above mentioned issue, submit a pull request, describing the changes and linking to the issue.
- New code must have test coverage.
- If the code is about performance issues, you must include benchmarks in the process (either in the issue or in the PR).
- In general, we would like to keep `concurrent-map` as simple as possible and as similar to the native `map`. Please keep this in mind when opening issues.
## license
MIT (see [LICENSE](https://github.com/orcaman/concurrent-map/blob/master/LICENSE) file)

View File

@ -1,315 +0,0 @@
package cmap
import (
"encoding/json"
"sync"
)
var SHARD_COUNT = 32
// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap []*ConcurrentMapShared
// A "thread" safe string to anything map.
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // Read Write mutex, guards access to internal map.
}
// Creates a new concurrent map.
func New() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}
// Returns shard under given key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}
func (m ConcurrentMap) MSet(data map[string]interface{}) {
for key, value := range data {
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
}
// Sets the given value under the specified key.
func (m ConcurrentMap) Set(key string, value interface{}) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
// Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
_, ok := shard.items[key]
if !ok {
shard.items[key] = value
}
shard.Unlock()
return !ok
}
// Retrieves an element from map under given key.
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
// Returns the number of elements within the map.
func (m ConcurrentMap) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
// Looks up an item under specified key
func (m ConcurrentMap) Has(key string) bool {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// See if element is within shard.
_, ok := shard.items[key]
shard.RUnlock()
return ok
}
// Removes an element from the map.
func (m ConcurrentMap) Remove(key string) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
delete(shard.items, key)
shard.Unlock()
}
// Removes an element from the map and returns it
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
v, exists = shard.items[key]
delete(shard.items, key)
shard.Unlock()
return v, exists
}
// Checks if map is empty.
func (m ConcurrentMap) IsEmpty() bool {
return m.Count() == 0
}
// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple struct {
Key string
Val interface{}
}
// Returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap) Iter() <-chan Tuple {
chans := snapshot(m)
ch := make(chan Tuple)
go fanIn(chans, ch)
return ch
}
// Returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
chans := snapshot(m)
total := 0
for _, c := range chans {
total += cap(c)
}
ch := make(chan Tuple, total)
go fanIn(chans, ch)
return ch
}
// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot(m ConcurrentMap) (chans []chan Tuple) {
chans = make([]chan Tuple, SHARD_COUNT)
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
}
wg.Wait()
return chans
}
// fanIn reads elements from channels `chans` into channel `out`
func fanIn(chans []chan Tuple, out chan Tuple) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
}
wg.Wait()
close(out)
}
// Returns all items as map[string]interface{}
func (m ConcurrentMap) Items() map[string]interface{} {
tmp := make(map[string]interface{})
// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return tmp
}
// Iterator callback,called for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb func(key string, v interface{})
// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap) IterCb(fn IterCb) {
for idx := range m {
shard := (m)[idx]
shard.RLock()
for key, value := range shard.items {
fn(key, value)
}
shard.RUnlock()
}
}
// Return all keys as []string
func (m ConcurrentMap) Keys() []string {
count := m.Count()
ch := make(chan string, count)
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m {
go func(shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
}
wg.Wait()
close(ch)
}()
// Generate keys
keys := make([]string, 0, count)
for k := range ch {
keys = append(keys, k)
}
return keys
}
//Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
// Create a temporary map, which will hold all item spread across shards.
tmp := make(map[string]interface{})
// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return json.Marshal(tmp)
}
func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
// will probably won't know which to type to unmarshal into, in such case
// we'll end up with a value of type map[string]interface{}, In most cases this isn't
// out value type, this is why we've decided to remove this functionality.
// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
// // Reverse process of Marshal.
// tmp := make(map[string]interface{})
// // Unmarshal into a single map.
// if err := json.Unmarshal(b, &tmp); err != nil {
// return nil
// }
// // foreach key,value pair in temporary map insert into our concurrent map.
// for key, val := range tmp {
// m.Set(key, val)
// }
// return nil
// }

View File

@ -1,196 +0,0 @@
package cmap
import "testing"
import "strconv"
func BenchmarkItems(b *testing.B) {
m := New()
// Insert 100 elements.
for i := 0; i < 10000; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
for i := 0; i < b.N; i++ {
m.Items()
}
}
func BenchmarkMarshalJson(b *testing.B) {
m := New()
// Insert 100 elements.
for i := 0; i < 10000; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
for i := 0; i < b.N; i++ {
m.MarshalJSON()
}
}
func BenchmarkStrconv(b *testing.B) {
for i := 0; i < b.N; i++ {
strconv.Itoa(i)
}
}
func BenchmarkSingleInsertAbsent(b *testing.B) {
m := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Set(strconv.Itoa(i), "value")
}
}
func BenchmarkSingleInsertPresent(b *testing.B) {
m := New()
m.Set("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Set("key", "value")
}
}
func benchmarkMultiInsertDifferent(b *testing.B) {
m := New()
finished := make(chan struct{}, b.N)
_, set := GetSet(m, finished)
b.ResetTimer()
for i := 0; i < b.N; i++ {
set(strconv.Itoa(i), "value")
}
for i := 0; i < b.N; i++ {
<-finished
}
}
func BenchmarkMultiInsertDifferent_1_Shard(b *testing.B) {
runWithShards(benchmarkMultiInsertDifferent, b, 1)
}
func BenchmarkMultiInsertDifferent_16_Shard(b *testing.B) {
runWithShards(benchmarkMultiInsertDifferent, b, 16)
}
func BenchmarkMultiInsertDifferent_32_Shard(b *testing.B) {
runWithShards(benchmarkMultiInsertDifferent, b, 32)
}
func BenchmarkMultiInsertDifferent_256_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetDifferent, b, 256)
}
func BenchmarkMultiInsertSame(b *testing.B) {
m := New()
finished := make(chan struct{}, b.N)
_, set := GetSet(m, finished)
m.Set("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
set("key", "value")
}
for i := 0; i < b.N; i++ {
<-finished
}
}
func BenchmarkMultiGetSame(b *testing.B) {
m := New()
finished := make(chan struct{}, b.N)
get, _ := GetSet(m, finished)
m.Set("key", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
get("key", "value")
}
for i := 0; i < b.N; i++ {
<-finished
}
}
func benchmarkMultiGetSetDifferent(b *testing.B) {
m := New()
finished := make(chan struct{}, 2*b.N)
get, set := GetSet(m, finished)
m.Set("-1", "value")
b.ResetTimer()
for i := 0; i < b.N; i++ {
set(strconv.Itoa(i-1), "value")
get(strconv.Itoa(i), "value")
}
for i := 0; i < 2*b.N; i++ {
<-finished
}
}
func BenchmarkMultiGetSetDifferent_1_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetDifferent, b, 1)
}
func BenchmarkMultiGetSetDifferent_16_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetDifferent, b, 16)
}
func BenchmarkMultiGetSetDifferent_32_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetDifferent, b, 32)
}
func BenchmarkMultiGetSetDifferent_256_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetDifferent, b, 256)
}
func benchmarkMultiGetSetBlock(b *testing.B) {
m := New()
finished := make(chan struct{}, 2*b.N)
get, set := GetSet(m, finished)
for i := 0; i < b.N; i++ {
m.Set(strconv.Itoa(i%100), "value")
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
set(strconv.Itoa(i%100), "value")
get(strconv.Itoa(i%100), "value")
}
for i := 0; i < 2*b.N; i++ {
<-finished
}
}
func BenchmarkMultiGetSetBlock_1_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetBlock, b, 1)
}
func BenchmarkMultiGetSetBlock_16_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetBlock, b, 16)
}
func BenchmarkMultiGetSetBlock_32_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetBlock, b, 32)
}
func BenchmarkMultiGetSetBlock_256_Shard(b *testing.B) {
runWithShards(benchmarkMultiGetSetBlock, b, 256)
}
func GetSet(m ConcurrentMap, finished chan struct{}) (set func(key, value string), get func(key, value string)) {
return func(key, value string) {
for i := 0; i < 10; i++ {
m.Get(key)
}
finished <- struct{}{}
}, func(key, value string) {
for i := 0; i < 10; i++ {
m.Set(key, value)
}
finished <- struct{}{}
}
}
func runWithShards(bench func(b *testing.B), b *testing.B, shardsCount int) {
oldShardsCount := SHARD_COUNT
SHARD_COUNT = shardsCount
bench(b)
SHARD_COUNT = oldShardsCount
}
func BenchmarkKeys(b *testing.B) {
m := New()
// Insert 100 elements.
for i := 0; i < 10000; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
for i := 0; i < b.N; i++ {
m.Keys()
}
}

View File

@ -1,574 +0,0 @@
package cmap
import (
"encoding/json"
"hash/fnv"
"sort"
"strconv"
"testing"
)
type Animal struct {
name string
}
func TestMapCreation(t *testing.T) {
m := New()
if m == nil {
t.Error("map is null.")
}
if m.Count() != 0 {
t.Error("new map should be empty.")
}
}
func TestInsert(t *testing.T) {
m := New()
elephant := Animal{"elephant"}
monkey := Animal{"monkey"}
m.Set("elephant", elephant)
m.Set("monkey", monkey)
if m.Count() != 2 {
t.Error("map should contain exactly two elements.")
}
}
func TestInsertAbsent(t *testing.T) {
m := New()
elephant := Animal{"elephant"}
monkey := Animal{"monkey"}
m.SetIfAbsent("elephant", elephant)
if ok := m.SetIfAbsent("elephant", monkey); ok {
t.Error("map set a new value even the entry is already present")
}
}
func TestGet(t *testing.T) {
m := New()
// Get a missing element.
val, ok := m.Get("Money")
if ok == true {
t.Error("ok should be false when item is missing from map.")
}
if val != nil {
t.Error("Missing values should return as null.")
}
elephant := Animal{"elephant"}
m.Set("elephant", elephant)
// Retrieve inserted element.
tmp, ok := m.Get("elephant")
elephant = tmp.(Animal) // Type assertion.
if ok == false {
t.Error("ok should be true for item stored within the map.")
}
if &elephant == nil {
t.Error("expecting an element, not null.")
}
if elephant.name != "elephant" {
t.Error("item was modified.")
}
}
func TestHas(t *testing.T) {
m := New()
// Get a missing element.
if m.Has("Money") == true {
t.Error("element shouldn't exists")
}
elephant := Animal{"elephant"}
m.Set("elephant", elephant)
if m.Has("elephant") == false {
t.Error("element exists, expecting Has to return True.")
}
}
func TestRemove(t *testing.T) {
m := New()
monkey := Animal{"monkey"}
m.Set("monkey", monkey)
m.Remove("monkey")
if m.Count() != 0 {
t.Error("Expecting count to be zero once item was removed.")
}
temp, ok := m.Get("monkey")
if ok != false {
t.Error("Expecting ok to be false for missing items.")
}
if temp != nil {
t.Error("Expecting item to be nil after its removal.")
}
// Remove a none existing element.
m.Remove("noone")
}
func TestPop(t *testing.T) {
m := New()
monkey := Animal{"monkey"}
m.Set("monkey", monkey)
v, exists := m.Pop("monkey")
if !exists {
t.Error("Pop didn't find a monkey.")
}
m1, ok := v.(Animal)
if !ok || m1 != monkey {
t.Error("Pop found something else, but monkey.")
}
v2, exists2 := m.Pop("monkey")
m1, ok = v2.(Animal)
if exists2 || ok || m1 == monkey {
t.Error("Pop keeps finding monkey")
}
if m.Count() != 0 {
t.Error("Expecting count to be zero once item was Pop'ed.")
}
temp, ok := m.Get("monkey")
if ok != false {
t.Error("Expecting ok to be false for missing items.")
}
if temp != nil {
t.Error("Expecting item to be nil after its removal.")
}
}
func TestCount(t *testing.T) {
m := New()
for i := 0; i < 100; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
if m.Count() != 100 {
t.Error("Expecting 100 element within map.")
}
}
func TestIsEmpty(t *testing.T) {
m := New()
if m.IsEmpty() == false {
t.Error("new map should be empty")
}
m.Set("elephant", Animal{"elephant"})
if m.IsEmpty() != false {
t.Error("map shouldn't be empty.")
}
}
func TestIterator(t *testing.T) {
m := New()
// Insert 100 elements.
for i := 0; i < 100; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
counter := 0
// Iterate over elements.
for item := range m.Iter() {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
}
if counter != 100 {
t.Error("We should have counted 100 elements.")
}
}
func TestBufferedIterator(t *testing.T) {
m := New()
// Insert 100 elements.
for i := 0; i < 100; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
counter := 0
// Iterate over elements.
for item := range m.IterBuffered() {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
}
if counter != 100 {
t.Error("We should have counted 100 elements.")
}
}
func TestIterCb(t *testing.T) {
m := New()
// Insert 100 elements.
for i := 0; i < 100; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
counter := 0
// Iterate over elements.
m.IterCb(func(key string, v interface{}) {
_, ok := v.(Animal)
if !ok {
t.Error("Expecting an animal object")
}
counter++
})
if counter != 100 {
t.Error("We should have counted 100 elements.")
}
}
func TestItems(t *testing.T) {
m := New()
// Insert 100 elements.
for i := 0; i < 100; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
items := m.Items()
if len(items) != 100 {
t.Error("We should have counted 100 elements.")
}
}
func TestConcurrent(t *testing.T) {
m := New()
ch := make(chan int)
const iterations = 1000
var a [iterations]int
// Using go routines insert 1000 ints into our map.
go func() {
for i := 0; i < iterations/2; i++ {
// Add item to map.
m.Set(strconv.Itoa(i), i)
// Retrieve item from map.
val, _ := m.Get(strconv.Itoa(i))
// Write to channel inserted value.
ch <- val.(int)
} // Call go routine with current index.
}()
go func() {
for i := iterations / 2; i < iterations; i++ {
// Add item to map.
m.Set(strconv.Itoa(i), i)
// Retrieve item from map.
val, _ := m.Get(strconv.Itoa(i))
// Write to channel inserted value.
ch <- val.(int)
} // Call go routine with current index.
}()
// Wait for all go routines to finish.
counter := 0
for elem := range ch {
a[counter] = elem
counter++
if counter == iterations {
break
}
}
// Sorts array, will make is simpler to verify all inserted values we're returned.
sort.Ints(a[0:iterations])
// Make sure map contains 1000 elements.
if m.Count() != iterations {
t.Error("Expecting 1000 elements.")
}
// Make sure all inserted values we're fetched from map.
for i := 0; i < iterations; i++ {
if i != a[i] {
t.Error("missing value", i)
}
}
}
func TestJsonMarshal(t *testing.T) {
SHARD_COUNT = 2
defer func() {
SHARD_COUNT = 32
}()
expected := "{\"a\":1,\"b\":2}"
m := New()
m.Set("a", 1)
m.Set("b", 2)
j, err := json.Marshal(m)
if err != nil {
t.Error(err)
}
if string(j) != expected {
t.Error("json", string(j), "differ from expected", expected)
return
}
}
func TestKeys(t *testing.T) {
m := New()
// Insert 100 elements.
for i := 0; i < 100; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
keys := m.Keys()
if len(keys) != 100 {
t.Error("We should have counted 100 elements.")
}
}
func TestMInsert(t *testing.T) {
animals := map[string]interface{}{
"elephant": Animal{"elephant"},
"monkey": Animal{"monkey"},
}
m := New()
m.MSet(animals)
if m.Count() != 2 {
t.Error("map should contain exactly two elements.")
}
}
func TestFnv32(t *testing.T) {
key := []byte("ABC")
hasher := fnv.New32()
hasher.Write(key)
if fnv32(string(key)) != hasher.Sum32() {
t.Errorf("Bundled fnv32 produced %d, expected result from hash/fnv32 is %d", fnv32(string(key)), hasher.Sum32())
}
}
func TestUpsert(t *testing.T) {
dolphin := Animal{"dolphin"}
whale := Animal{"whale"}
tiger := Animal{"tiger"}
lion := Animal{"lion"}
cb := func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
nv := newValue.(Animal)
if !exists {
return []Animal{nv}
}
res := valueInMap.([]Animal)
return append(res, nv)
}
m := New()
m.Set("marine", []Animal{dolphin})
m.Upsert("marine", whale, cb)
m.Upsert("predator", tiger, cb)
m.Upsert("predator", lion, cb)
if m.Count() != 2 {
t.Error("map should contain exactly two elements.")
}
compare := func(a, b []Animal) bool {
if a == nil || b == nil {
return false
}
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}
marineAnimals, ok := m.Get("marine")
if !ok || !compare(marineAnimals.([]Animal), []Animal{dolphin, whale}) {
t.Error("Set, then Upsert failed")
}
predators, ok := m.Get("predator")
if !ok || !compare(predators.([]Animal), []Animal{tiger, lion}) {
t.Error("Upsert, then Upsert failed")
}
}
func TestKeysWhenRemoving(t *testing.T) {
m := New()
// Insert 100 elements.
Total := 100
for i := 0; i < Total; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
// Remove 10 elements concurrently.
Num := 10
for i := 0; i < Num; i++ {
go func(c *ConcurrentMap, n int) {
c.Remove(strconv.Itoa(n))
}(&m, i)
}
keys := m.Keys()
for _, k := range keys {
if k == "" {
t.Error("Empty keys returned")
}
}
}
//
func TestUnDrainedIter(t *testing.T) {
m := New()
// Insert 100 elements.
Total := 100
for i := 0; i < Total; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
counter := 0
// Iterate over elements.
ch := m.Iter()
for item := range ch {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
if counter == 42 {
break
}
}
for i := Total; i < 2*Total; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
for item := range ch {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
}
if counter != 100 {
t.Error("We should have been right where we stopped")
}
counter = 0
for item := range m.IterBuffered() {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
}
if counter != 200 {
t.Error("We should have counted 200 elements.")
}
}
func TestUnDrainedIterBuffered(t *testing.T) {
m := New()
// Insert 100 elements.
Total := 100
for i := 0; i < Total; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
counter := 0
// Iterate over elements.
ch := m.IterBuffered()
for item := range ch {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
if counter == 42 {
break
}
}
for i := Total; i < 2*Total; i++ {
m.Set(strconv.Itoa(i), Animal{strconv.Itoa(i)})
}
for item := range ch {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
}
if counter != 100 {
t.Error("We should have been right where we stopped")
}
counter = 0
for item := range m.IterBuffered() {
val := item.Val
if val == nil {
t.Error("Expecting an object.")
}
counter++
}
if counter != 200 {
t.Error("We should have counted 200 elements.")
}
}