Distributed Cache

Content:

  1. Introduction
  2. Theory
  3. Example
  4. Hazelcast
  5. Infinispan
  6. Ignite
  7. Ignite via SQL
  8. memcached
  9. Redis

Introduction:

Cache is very important in many high performance applications.

Theory

Idea:

In general a cache is a smaller but faster storage containing frequently used data from a larger but slower main storage.

For traditional applications that means cache is data stored in memory while main storage is disk persistence - usually some sort of database (relational or NoSQL).

Without cache:

read slow read database
write slow write database

With cache:

read - data not in cache slow read database
fast write cache
read - data in cache fast read cache
write slow write database
fast write cache

It is obvious that cache only improves performance significantly if a significant portion of reads can be fulfilled from the cache.

Topologies:

The starting point is a normal application using a database:

No cache

If the application is too slow due to lots of queries hitting the database a cache can be introduced:

Cache single node

This is pretty simple. It is also very simple software wise as the cache is really just a global data structure in the application:

Cache single node - detailed

The problem arise when switching from a single node config to a multi node cluster config.

Using the same model as for single node does not work as the cache on one node may contain obsolete data, because an update has happened on another node.

Instead a single distributed cache spanning all nodes in the cluster is needed:

Cache multi node cluster

Software wise this require much more complicated logic as the cache needs to synchronize between nodes:

Cache multi node node - detailed

An alternative approach is to move the cache out in separate cache servers:

Cache server

Software wise that looks like:

Cache server - detailed

Separate cache servers are often preferred over in-application distributed cache.

Reasons:

There are commercial cache servers, including:

There are open source cache servers, including:

Concepts:

Cache can work in two modes:

write through mode
updates get written to main storage immediatetly
write back mode
updates get written to main storage with some delay

Even though write back gives better performance than write through then I will usually recommend write through to avoid the risk of data loss.

There are two distinct situations for cache usage. One is where the cache has the same size as the persistent storage behind - in that case all data can be in cache. One is when the cache is much smaller than the persistent storage behind - in that case only a small portion of the data can be in cache.

In the last case the cache need to be configured how to handle that. Options can be quite complex but usually include:

maximum cache size
maximum for how much memory that can be allocated for cache
eviction policy
how to determine what entries to evict from the cache when maximum size is reached, frequently used option is "least recently used"

Data being evicted should never be a problem for the application, because data is always available from the persistent storage behind.

I will not go into details about this configuration as the optimal configuration is very dependent on the specific domain.

Most cache servers has the ability if configured so to persist data on disk.

I do not consider persistence in cache server an important feature, because it takes away all or some of the performance benefits of the cache, which was the original reason to use cache.

Functionality:

A cache server operates with a key and a value.

The basic operations of a cache server are:

Optional operations of a cache server are:

In many ways the functionality of a distributed cache is very similar to the functionality of a NoSQL Key Value Store.

The main difference is that a NoSQL Key Value Store always persist to disk while a cache server always persist to memory amnd optionally write back to disk.

Example

Here comes some examples using commonly used cache servers and clients in various languages.

The cache servers used in examples are all open source.

The examples are relative simple but should be sufficient to show the basic usage.

The examples will not cover cluster configuration of the server. See the servers documentation for how to setup that.

All examples will us a string key and an integer value. Value could be a string or an serialized object.

Hazelcast

Name Hazelcast
History Open source cache server first released in 2009
Platform Java (should in theory run on any OS with Java)
Server capabilities embedded cache (JVM languages only) and standalone cache server
cluster support
persistence support
Client languages Java and other JVM languages, C# and other .NET languages, Python, C++ etc.
memcached protocol
RESTful HTTP API
Optional features list all
subscribe to changes
concurrency handling (locking, replace with check, atomic increment)
Namespace named caches
Other JCache API support
transaction support

Examples:

Hazelcast comes with Java client library.

Functionality:

package cache.hazelcast;

import java.util.Iterator;
import java.util.Map.Entry;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.map.listener.EntryAddedListener;
import com.hazelcast.map.listener.EntryRemovedListener;

public class TestFunctional {
    private static void test(IMap<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all (old style)
        Iterator<Entry<String,Integer>> it = cache.entrySet().iterator();
        while(it.hasNext()) {
            Entry<String,Integer> e = it.next();
            System.out.printf("%s = %d\n", e.getKey(), e.getValue());
        }
        // list all (new style)
        cache.forEach((k,v) -> System.out.printf("%s = %d\n", k, v));
        // listener
        cache.addEntryListener(new EntryAddedListener<String,Integer>() {
            @Override
            public void entryAdded(EntryEvent<String,Integer> ee) {
                System.out.printf("Add : %s = %d\n", ee.getKey(), ee.getValue());
            }
        }, true);
        cache.addEntryListener(new EntryRemovedListener<String,Integer>() {
            @Override
            public void entryRemoved(EntryEvent<String,Integer> ee) {
                System.out.printf("Remove : %s\n", ee.getKey());
            }
        }, true);
        cache.put("x", 123);
        cache.remove("x");
        Thread.sleep(10); // ensure sufficient time to call listener 
    }
    public static void main(String[] args) throws Exception {
        ClientConfig cfg = new ClientConfig();
        HazelcastInstance cli = HazelcastClient.newHazelcastClient(cfg);
        IMap<String,Integer> cache = cli.getMap("functional_cache");
        test(cache);
        cli.shutdown();
    }
}

Performance:

package cache.hazelcast;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(IMap<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        ClientConfig cfg = new ClientConfig();
        HazelcastInstance cli = HazelcastClient.newHazelcastClient(cfg);
        IMap<String,Integer> cache = cli.getMap("performance_cache");
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cli.shutdown();
    }
}
7127 puts per second
9940 puts per second
10460 puts per second
10405 puts per second
10504 puts per second

Concurrency:

package cache.hazelcast;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private IMap<String,Integer> cache;
        public Test1(IMap<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test2 implements Runnable {
        private IMap<String,Integer> cache;
        private ILock lck;
        public Test2(IMap<String,Integer> cache, ILock lck) {
            this.cache = cache;
            this.lck = lck;
        }
        @Override
        public void run() {
            lck.lock();
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
            lck.unlock();;
        }
    }
    private static class Test3 implements Runnable {
        private IMap<String,Integer> cache;
        public Test3(IMap<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static class Test4 implements Runnable {
        private IAtomicLong counter;
        public Test4(IAtomicLong counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            counter.addAndGet(1);
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(IMap<String,Integer> cache, String lbl, Runnable action, IAtomicLong counter) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        if(counter != null) {
            cache.put("hot",  (int)counter.get());
        }
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        ClientConfig cfg = new ClientConfig();
        HazelcastInstance cli = HazelcastClient.newHazelcastClient(cfg);
        IMap<String,Integer> cache = cli.getMap("concurrency_cache");
        ILock lck = cli.getLock("hot");
        IAtomicLong counter = cli.getAtomicLong("superhot");
        counter.set(0);
        test(cache, "No handling", new Test1(cache), null);
        test(cache, "Locking", new Test2(cache, lck), null);
        test(cache, "Replace with check", new Test3(cache), null);
        test(cache, "Atomic increment", new Test4(counter), counter);
        cli.shutdown();
    }
}

Note that both locking and atomic increment has been changed in recent Hazelcast versions.

No handling : 214 = 10000 (424 ms)
Locking : 10000 = 10000 (3355 ms)
Replace with check : 10000 = 10000 (4711 ms)
Atomic increment : 10000 = 10000 (86 ms)

Hazelcast supports the standard Java cache API JCache defined in JSR 107 from 2014.

Functionality:

package cache.jcache;

import java.util.Iterator;

import javax.cache.Cache;
import javax.cache.Cache.Entry;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.spi.CachingProvider;

public class TestFunctional {
    private static class MyListener implements CacheEntryCreatedListener<String,Integer>, CacheEntryRemovedListener<String,Integer> {
        @Override
        public void onCreated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> itcee) throws CacheEntryListenerException {
            for(CacheEntryEvent<? extends String, ? extends Integer> cee : itcee) {
                System.out.printf("Add : %s = %d\n", cee.getKey(), cee.getValue());
            }
        }
        @Override
        public void onRemoved(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> itcee) throws CacheEntryListenerException {
            for(CacheEntryEvent<? extends String, ? extends Integer> cee : itcee) {
                System.out.printf("Remove : %s\n", cee.getKey());
            }
        }
    }
    private static void test(Cache<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all (old style)
        Iterator<Entry<String,Integer>> it = cache.iterator();
        while(it.hasNext()) {
            Entry<String,Integer> e = it.next();
            System.out.printf("%s = %d\n", e.getKey(), e.getValue());
        }
        // list all (new style)
        cache.forEach(e -> System.out.printf("%s = %d\n", e.getKey(), e.getValue()));
        // listener
        CacheEntryListenerConfiguration<String,Integer> celc = new MutableCacheEntryListenerConfiguration<String,Integer>(MyListener::new, null, false, true);
        cache.registerCacheEntryListener(celc);
        cache.put("x", 123);
        cache.remove("x");
        Thread.sleep(10); // ensure sufficient time to call listener 
    }
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("com.hazelcast.cache.HazelcastCachingProvider");
        CacheManager cm = cp.getCacheManager();
        Cache<String,Integer> cache = cm.getCache("functional_cache");
        if(cache == null) {
            cache = cm.createCache("functional_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache);
        cache.close();
        cm.close();
        cp.close();
    }
}

Note that cache with name "foobar" accessed via Hazelcast API is not the same cache as cache with name "foobar" accessed via JCache API.

Performance:

package cache.jcache;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Cache<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("com.hazelcast.cache.HazelcastCachingProvider");
        CacheManager cm = cp.getCacheManager();
        Cache<String,Integer> cache = cm.getCache("performance_cache");
        if(cache == null) {
            cache = cm.createCache("performance_cache", new MutableConfiguration<String,Integer>());
        }
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cache.close();
        cm.close();
        cp.close();
    }
}
6724 puts per second
9551 puts per second
9615 puts per second
9615 puts per second
9615 puts per second

Concurrency:

JCache API only supports the replace with check paradigm.

package cache.jcache;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private Cache<String,Integer> cache;
        public Test1(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test3 implements Runnable {
        private Cache<String,Integer> cache;
        public Test3(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(Cache<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("com.hazelcast.cache.HazelcastCachingProvider");
        CacheManager cm = cp.getCacheManager();
        Cache<String,Integer> cache = cm.getCache("concurrency_cache");
        if(cache == null) {
            cache = cm.createCache("concurrency_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache, "No handling", new Test1(cache));
        test(cache, "Replace with check", new Test3(cache));
        cache.close();
        cm.close();
        cp.close();
    }
}
No handling : 195 = 10000 (475 ms)
Replace with check : 10000 = 10000 (3748 ms)

There is an upcoming Java EE standard for NoSQL databases: Jakarta NoSQL. The reference implementation is Eclipse JNoSQL.

Functionality:

package cache.hazelcast.jnosql;

import java.util.Map;

import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.hazelcast.keyvalue.HazelcastKeyValueConfiguration;

public class TestFunctional {
    private static void test(Map<String,Integer> cache) {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all not supported
        // listener not supported
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new HazelcastKeyValueConfiguration();
        BucketManagerFactory bmf = cfg.get();
        Map<String,Integer> cache = bmf.getMap("functional_cache", String.class, Integer.class);
        test(cache);
        bmf.close(); // does not shutdown Hazelcast instance    :-(
    }
}

Performance:

package cache.hazelcast.jnosql;

import java.util.Map;

import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.hazelcast.keyvalue.HazelcastKeyValueConfiguration;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Map<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new HazelcastKeyValueConfiguration();
        BucketManagerFactory bmf = cfg.get();
        Map<String,Integer> cache = bmf.getMap("performance_cache", String.class, Integer.class);
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        bmf.close(); // does not shutdown Hazelcast instance    :-(
    }
}
5564 puts per second
8474 puts per second
8756 puts per second
8826 puts per second
8779 puts per second

Concurrency:

JNoSQL API does not currently support any of the paradigms.

package cache.hazelcast.jnosql;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.hazelcast.keyvalue.HazelcastKeyValueConfiguration;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private Map<String,Integer> cache;
        public Test1(Map<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(Map<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new HazelcastKeyValueConfiguration();
        BucketManagerFactory bmf = cfg.get();
        Map<String,Integer> cache = bmf.getMap("concurrency_cache", String.class, Integer.class);
        test(cache, "No handling", new Test1(cache));
        bmf.close(); // does not shutdown Hazelcast instance    :-(
    }
}
No handling : 233 = 10000 (803 ms)

HazelCast.NET is available via NuGet.

Functionality:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

using Hazelcast.Core;
using Hazelcast.Client;

namespace Cache.Hazelcast.Functional
{
    public class MyEntryAddedListener : EntryAddedListener<string,int?>
    {
        public void EntryAdded(EntryEvent<string,int?> ee)
        {
            Console.WriteLine("Add : {0} = {1}", ee.GetKey(), ee.GetValue());   
        }
    }
    public class MyEntryRemovedListener : EntryRemovedListener<string,int?>
    {
        public void EntryRemoved(EntryEvent<string,int?> ee)
        {
            Console.WriteLine("Remove : {0}", ee.GetKey());
        }
    }
    public class Program
    {
        private static void Test(IMap<string,int?> cache)
        {
            // basic put, get and remove
            cache.Put("v", 123);
            Console.WriteLine(cache.Get("v"));
            cache.Remove("v");
            Console.WriteLine(cache.Get("v"));
            cache.Put("v", 123);
            Console.WriteLine(cache.Get("v"));
            // persistence
            int? counter = cache.Get("counter");
            if(counter == null)
            {
                counter = 0;
            }
            counter++;
            Console.WriteLine(counter);
            cache.Put("counter", counter);
            // list all
            foreach(KeyValuePair<string,int?> kvp in cache.EntrySet())
            {
                Console.WriteLine("{0} = {1}", kvp.Key, kvp.Value);
            }
            // listener
            cache.AddEntryListener(new MyEntryAddedListener(), true);
            cache.AddEntryListener(new MyEntryRemovedListener(), true);
            cache.Put("x", 123);
            cache.Remove("x");
            Thread.Sleep(10); // ensure sufficient time to call listener 
        }
        public static void Main(string[] args)
        {
            IHazelcastInstance cli = HazelcastClient.NewHazelcastClient();
            IMap<string,int?> cache = cli.GetMap<string,int?>("functional_cache");
            Test(cache);
            cli.Shutdown();
        }
    }
}

Performance:

using System;

using Hazelcast.Core;
using Hazelcast.Client;

namespace Cache.Hazelcast.Functional
{
    public class Program
    {
        private const int N = 10000;
        private static void Test(IMap<string,int?> cache)
        {
            // put speed
            DateTime dt1 = DateTime.Now;
            for(int i = 0; i < N; i++)
            {
                cache.Put("K#" + (i + 1), i + 1);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} puts per second", N * 1000 / (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int REP = 5;
        public static void Main(string[] args)
        {
            IHazelcastInstance cli = HazelcastClient.NewHazelcastClient();
            IMap<string,int?> cache = cli.GetMap<string,int?>("performance_cache");
            for(int i = 0; i < REP; i++)
            {
                Test(cache);
            }
            cli.Shutdown();
        }
    }
}
6199 puts per second
6337 puts per second
6345 puts per second
6253 puts per second
6353 puts per second

Concurrency:

using System;
using System.Threading;

using Hazelcast.Core;
using Hazelcast.Client;

namespace Cache.Hazelcast.Concurrency
{
    public class Program
    {
        private delegate void F(IMap<string,int?> cache);
        private static void Test1(IMap<string,int?> cache) 
        {
            int? hot = cache.Get("hot");
            hot = hot + 1;
            cache.Put("hot", hot);
        }
        private static void Test2(IMap<string,int?> cache, ILock lck) 
        {
            lck.Lock();
            int? hot = cache.Get("hot");
            hot = hot + 1;
            cache.Put("hot", hot);
            lck.Unlock();
        }
        private static void Test3(IMap<string,int?> cache) 
        {
            bool done = false;
            while(!done)
            {
                int? hot1 = cache.Get("hot");
                int? hot2 = hot1 + 1;
                done = cache.Replace("hot", hot1, hot2);
            }
        }
        private static void Test4(IAtomicLong counter) 
        {
            counter.AddAndGet(1);
        }
        private static void Testn(IMap<string,int?> cache, int nrep, F action)
        {
            for(int i = 0; i < nrep; i++)
            {
                action(cache);
            }
        }
        private const int NTHREADS = 100;
        private const int NOPS = 10000;
        private static void Test(IMap<string,int?> cache, string lbl, F action, IAtomicLong counter)
        {
            DateTime dt1 = DateTime.Now;
            cache.Put("hot", 0);
            Thread[] t = new Thread[NTHREADS];
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i] = new Thread(() => Testn(cache, NOPS/NTHREADS, action));
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Start();
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Join();
            }
            if(counter != null)
            {
                cache.Put("hot", (int)counter.Get());
            }
            int? hot = cache.Get("hot");
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} : {1} = {2} ({3} ms)", lbl, hot, NOPS, (int)((dt2 - dt1).TotalMilliseconds));
        }
        public static void Main(string[] args)
        {
            IHazelcastInstance cli = HazelcastClient.NewHazelcastClient();
            IMap<string,int?> cache = cli.GetMap<string,int?>("concurrency_cache");
            ILock lck = cli.GetLock("hot");
            IAtomicLong counter = cli.GetAtomicLong("superhot");
            counter.Set(0);
            Test(cache, "No handling", c => Test1(c), null);
            Test(cache, "Locking", c => Test2(c, lck), null);
            Test(cache, "Replace with check", c => Test3(c), null);
            Test(cache, "Atomic increment", c => Test4(counter), counter);
            cli.Shutdown();
        }
    }
}
No handling : 410 = 10000 (2246 ms)
Locking : 10000 = 10000 (7041 ms)
Replace with check : 10000 = 10000 (14075 ms)
Atomic increment : 10000 = 10000 (428 ms)

hazelcast-python-client can be installed via pip.

Functionality:

import hazelcast
import time

cfg = hazelcast.ClientConfig()
cli = hazelcast.HazelcastClient(cfg)
cache = cli.get_map("functional_cache")

# basic put, get and remove
cache.put("v", 123)
print(cache.get("v").result())
cache.remove("v")
print(cache.get("v").result())
cache.put("v", 123)
print(cache.get("v").result())

# persistence
counter = cache.get("counter").result()
if(counter == None):
    counter = 0;
counter = counter + 1
print(counter)
cache.put("counter", counter)

# list all
for e in cache.entry_set().result():
    print("%s = %d" % (e[0],e[1]))

# listener
def added(ee):
    print('Add : %s : %d' % (ee.key, ee.value))
def removed(ee):
    print('Remove : %s' % (ee.key))
cache.add_entry_listener(include_value=True, added_func=added)
cache.add_entry_listener(removed_func=removed)
cache.put("x", 123)
cache.remove("x")
time.sleep(1) # ensure sufficient time to call listener 

cli.shutdown()

Performance:

import hazelcast
import time

cfg = hazelcast.ClientConfig()
cli = hazelcast.HazelcastClient(cfg)
cache = cli.get_map("performance_cache")

# put speed
N = 10000
for j in range(5):
    t1 = time.time()
    for i in range(N):
        cache.put("K#" + str(i + 1), i + 1);
    t2 = time.time()
    print("%d puts per second" % (N / (t2 - t1)))

cli.shutdown()
3454 puts per second
3455 puts per second
3460 puts per second
4362 puts per second
5143 puts per second

Concurrency:

import hazelcast
import multiprocessing
import time

NTHREADS = 100
NOPS = 10000

def test1():
    cfg = hazelcast.ClientConfig()
    cli = hazelcast.HazelcastClient(cfg)
    cache = cli.get_map("performance_cache")
    for i in range(NOPS//NTHREADS):
        hot = cache.get("hot").result()
        hot = hot + 1
        cache.put("hot", hot)
    cli.shutdown()

def test2():
    cfg = hazelcast.ClientConfig()
    cli = hazelcast.HazelcastClient(cfg)
    cache = cli.get_map("performance_cache")
    lck = cli.get_lock("hot").blocking()
    for i in range(NOPS//NTHREADS):
        lck.lock()
        hot = cache.get("hot").result()
        hot = hot + 1
        cache.put("hot", hot)
        lck.unlock()
    cli.shutdown()

def test3():
    cfg = hazelcast.ClientConfig()
    cli = hazelcast.HazelcastClient(cfg)
    cache = cli.get_map("performance_cache")
    for i in range(NOPS//NTHREADS):
        done = False
        while not done:
            hot1 = cache.get("hot").result()
            hot2 = hot1 + 1
            done = cache.replace_if_same("hot", hot1, hot2).result()
    cli.shutdown()

def test4():
    cfg = hazelcast.ClientConfig()
    cli = hazelcast.HazelcastClient(cfg)
    cache = cli.get_map("performance_cache")
    counter = cli.get_atomic_long("superhot").blocking()
    for i in range(NOPS//NTHREADS):
        counter.add_and_get(1)
    cli.shutdown()

def test(lbl, f, usecounter):
    cfg = hazelcast.ClientConfig()
    cli = hazelcast.HazelcastClient(cfg)
    cache = cli.get_map("performance_cache")
    t1 = time.time()
    cache.put("hot", 0)
    if usecounter:
        counter = cli.get_atomic_long("superhot").blocking()
        counter.set(0)
    t = []
    for i in range(NTHREADS):
        t.append(multiprocessing.Process(target=f, args=()))
    for i in range(NTHREADS):
        t[i].start()
    for i in range(NTHREADS):
        t[i].join()
    if usecounter:
        cache.put("hot", counter.get())
    hot = cache.get("hot").result()
    t2 = time.time()
    print("%s : %d = %d (%d ms)" % (lbl, hot, NOPS, (t2 - t1) * 1000))
    cli.shutdown()

if __name__ == '__main__':
    test("No handling", test1, False);
    test("Locking", test2, False);
    test("Replace with check", test3, False);
    test("Atomic increment", test4, True);
No handling : 3245 = 10000 (19094 ms)
Locking : 10000 = 10000 (29471 ms)
Replace with check : 10000 = 10000 (49834 ms)
Atomic increment : 10000 = 10000 (18641 ms)

Note that comparing speed of Python with other languages for this test is misleading as Python is implemented with multiple processes while the other languages are implemented with threads.

Infinispan

Name Infinispan
History Open source cache server first released in 2009
Originate from JBoss/Redhat
Platform Java (should in theory run on any OS with Java)
Server capabilities embedded cache (JVM languages only) and standalone cache server
cluster support
persistence support
Client languages Java and other JVM languages, C# and other .NET languages, Python, C++ etc.
Optional features list all
subscribe to changes
concurrency handling (locking, replace with check, atomic increment)
Namespace named caches
Other JCache API support
transaction support

Examples:

Infinispan comes with Java client library.

Functionality:

package cache.infinispan;

import java.util.Iterator;
import java.util.Map.Entry;

import org.infinispan.client.hotrod.DefaultTemplate;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;

public class TestFunctional {
    @ClientListener
    private static class MyListener {
        @ClientCacheEntryCreated
        public void entryAdded(ClientCacheEntryCreatedEvent<String> e) {
            System.out.printf("Add : %s\n", e.getKey());
        }
        @ClientCacheEntryRemoved
        public void entryRemoved(ClientCacheEntryRemovedEvent<String> e) {
            System.out.printf("Remove : %s\n", e.getKey());
        }
    }
    private static void test(RemoteCache<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all (old style)
        Iterator<Entry<String,Integer>> it = cache.entrySet().iterator();
        while(it.hasNext()) {
            Entry<String,Integer> e = it.next();
            System.out.printf("%s = %d\n", e.getKey(), e.getValue());
        }
        // list all (new style)
        cache.forEach((k,v) -> System.out.printf("%s = %d\n", k, v));
        // listener
        cache.addClientListener(new MyListener());
        cache.put("x", 123);
        cache.remove("x");
        Thread.sleep(10); // ensure sufficient time to call listener 
    }
    public static void main(String[] args) throws Exception {
          ConfigurationBuilder b = new ConfigurationBuilder();
          b.addServer().host("localhost").port(ConfigurationProperties.DEFAULT_HOTROD_PORT);
          RemoteCacheManager rcm = new RemoteCacheManager(b.build());
          RemoteCache<String,Integer> cache = rcm.administration().getOrCreateCache("functional_cache", DefaultTemplate.DIST_SYNC);
          test(cache);
          rcm.stop();
          rcm.close();
    }
}

Performance:

package cache.infinispan;

import org.infinispan.client.hotrod.DefaultTemplate;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(RemoteCache<String,Integer> cache) throws InterruptedException {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        ConfigurationBuilder b = new ConfigurationBuilder();
        b.addServer().host("localhost").port(ConfigurationProperties.DEFAULT_HOTROD_PORT);
        RemoteCacheManager rcm = new RemoteCacheManager(b.build());
        RemoteCache<String,Integer> cache = rcm.administration().getOrCreateCache("performance_cache", DefaultTemplate.DIST_SYNC);
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        rcm.stop();
        rcm.close();
    }
}
6666 puts per second
11614 puts per second
13089 puts per second
13642 puts per second
13550 puts per second

Concurrency:

package cache.infinispan;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.infinispan.client.hotrod.DefaultTemplate;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.RemoteCounterManagerFactory;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.CounterType;
import org.infinispan.counter.api.StrongCounter;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private RemoteCache<String,Integer> cache;
        public Test1(RemoteCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test3 implements Runnable {
        private RemoteCache<String,Integer> cache;
        public Test3(RemoteCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
               boolean done = false;
                while(!done) {
                    int hot1 = cache.get("hot");
                    int hot2 = hot1 + 1;
                    done = cache.replace("hot", hot1, hot2);
                }
        }
    }
    private static class Test4 implements Runnable {
        private StrongCounter counter;
        public Test4(StrongCounter counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            try {
                counter.addAndGet(1).get()
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(RemoteCache<String,Integer> cache, String lbl, Runnable action, StrongCounter counter) throws InterruptedException, ExecutionException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        if(counter != null) {
            cache.put("hot",  (int)(long)counter.getValue().get());
        }
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        ConfigurationBuilder b = new ConfigurationBuilder();
        b.addServer().host("localhost").port(ConfigurationProperties.DEFAULT_HOTROD_PORT);
        RemoteCacheManager rcm = new RemoteCacheManager(b.build());
        RemoteCache<String,Integer> cache = rcm.administration().getOrCreateCache("concurrency_cache", DefaultTemplate.DIST_SYNC);
        CounterManager cm = RemoteCounterManagerFactory.asCounterManager(rcm);
        cm.defineCounter("superhot", CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG).initialValue(0).build());
        StrongCounter counter = cm.getStrongCounter("superhot");
        counter.reset().get();
        test(cache, "No handling", new Test1(cache), null);
        // Infinispan only supports locking for embedded cache - not for remote access to standalone cache server
        test(cache, "Replace with check", new Test3(cache), null);
        test(cache, "Atomic increment", new Test4(counter), counter);
        rcm.stop();
        rcm.close();
    }
}
No handling : 314 = 10000 (1822 ms)
Replace with check : 10000 = 10000 (9487 ms)
Atomic increment : 10000 = 10000 (876 ms)

Infinispan supports the standard Java cache API JCache defined in JSR 107 from 2014.

Functionality:

package cache.jcache;

import java.util.Iterator;

import javax.cache.Cache;
import javax.cache.Cache.Entry;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class TestFunctional {
    private static void test(Cache<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all (old style)
        Iterator<Entry<String,Integer>> it = cache.iterator();
        while(it.hasNext()) {
            Entry<String,Integer> e = it.next();
            System.out.printf("%s = %d\n", e.getKey(), e.getValue());
        }
        // list all (new style)
        cache.forEach(e -> System.out.printf("%s = %d\n", e.getKey(), e.getValue()));
        // listener does not work in 10.1.7
    }
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("org.infinispan.jcache.remote.JCachingProvider");
        CacheManager cm = cp.getCacheManager();
        Cache<String,Integer> cache = cm.getCache("functional_cache");
        if(cache == null) {
            cache = cm.createCache("functional_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache);
        cache.close();
        cm.close();
        cp.close();
    }
}

Performance:

package cache.jcache;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Cache<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("org.infinispan.jcache.remote.JCachingProvider");
        CacheManager cm = cp.getCacheManager();
        Cache<String,Integer> cache = cm.getCache("performance_cache");
        if(cache == null) {
            cache = cm.createCache("performance_cache", new MutableConfiguration<String,Integer>());
        }
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cache.close();
        cm.close();
        cp.close();
    }
}
4011 puts per second
6349 puts per second
6357 puts per second
6510 puts per second
6574 puts per second

Concurrency:

JCache API only supports the replace with check paradigm.

package cache.jcache;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private Cache<String,Integer> cache;
        public Test1(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test3 implements Runnable {
        private Cache<String,Integer> cache;
        public Test3(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(Cache<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("org.infinispan.jcache.remote.JCachingProvider");
        CacheManager cm = cp.getCacheManager();
        Cache<String,Integer> cache = cm.getCache("concurrency_cache");
        if(cache == null) {
            cache = cm.createCache("concurrency_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache, "No handling", new Test1(cache));
        test(cache, "Replace with check", new Test3(cache));
        cache.close();
        cm.close();
        cp.close();
    }
}
No handling : 118 = 10000 (2117 ms)
Replace with check : 10000 = 10000 (14624 ms)

There is an upcoming Java EE standard for NoSQL databases: Jakarta NoSQL. The reference implementation is Eclipse JNoSQL.

Functionality:

package cache.infinispan.jnosql;

import java.util.HashMap;
import java.util.Map;

import jakarta.nosql.Settings;
import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.infinispan.keyvalue.InfinispanKeyValueConfiguration;

public class TestFunctional {
    private static void test(Map<String,Integer> cache) {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all not supported
        // listener not supported
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new InfinispanKeyValueConfiguration();
        Map<String, Object> props = new HashMap<>();
        props.put("infinispan.host", "localhost"); // necessary to get remote cache instead of local cache
        BucketManagerFactory bmf = cfg.get(Settings.of(props));
        Map<String,Integer> cache = bmf.getMap("functional_cache", String.class, Integer.class);
        test(cache);
        bmf.close();
    }
}

Performance:

package cache.infinispan.jnosql;

import java.util.HashMap;
import java.util.Map;

import jakarta.nosql.Settings;
import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.infinispan.keyvalue.InfinispanKeyValueConfiguration;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Map<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new InfinispanKeyValueConfiguration();
        Map<String, Object> props = new HashMap<>();
        props.put("infinispan.host", "localhost"); // necessary to get remote cache instead of local cache
        BucketManagerFactory bmf = cfg.get(Settings.of(props));
        Map<String,Integer> cache = bmf.getMap("performance_cache", String.class, Integer.class);
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        bmf.close();
    }
}
6020 puts per second
11709 puts per second
12870 puts per second
13333 puts per second
13297 puts per second

Concurrency:

JNoSQL API does not currently support any of the paradigms.

package cache.infinispan.jnosql;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import jakarta.nosql.Settings;
import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.infinispan.keyvalue.InfinispanKeyValueConfiguration;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private Map<String,Integer> cache;
        public Test1(Map<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(Map<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new InfinispanKeyValueConfiguration();
        Map<String, Object> props = new HashMap<>();
        props.put("infinispan.host", "localhost"); // necessary to get remote cache instead of local cache
        BucketManagerFactory bmf = cfg.get(Settings.of(props));
        Map<String,Integer> cache = bmf.getMap("concurrency_cache", String.class, Integer.class);
        test(cache, "No handling", new Test1(cache));
        bmf.close();
    }
}
No handling : 191 = 10000 (2314 ms)

There is a .NET client library for Infinispan called Inifinispan.HotRod available via NuGet.

It is a .NET wrapper around a native library and it is not uptodate version wise.

Functionality:

using System;
using System.Collections.Generic;
using System.Threading;

using Infinispan.HotRod;
using Infinispan.HotRod.Config;
using Infinispan.HotRod.Event;

namespace Cache.Infinispan.Functional
{
    public class Program
    {
        private static void Test(IRemoteCache<string,int?> cache)
        {
            // basic put, get and remove
            cache.Put("v", 123);
            Console.WriteLine(cache.Get("v"));
            cache.Remove("v");
            Console.WriteLine(cache.Get("v"));
            cache.Put("v", 123);
            Console.WriteLine(cache.Get("v"));
            // persistence
            int? counter = cache.Get("counter");
            if (counter == null)
            {
                counter = 0;
            }
            counter++;
            Console.WriteLine(counter);
            cache.Put("counter", counter);
            // list all
            foreach (string key in cache.KeySet())
            {
                Console.WriteLine("{0} = {1}", key, cache.Get(key));
            }
            // listener does not work in 8.2.0 RC2
        }
        public static void Main(string[] args)
        {
            ConfigurationBuilder b = new ConfigurationBuilder();
            b.AddServer().Host("localhost").Port(11222);
            RemoteCacheManager rcm = new RemoteCacheManager(b.Build());
            IRemoteCache<string,int?> cache = rcm.GetCache<string,int?>("functional_cache");
            Test(cache);
            rcm.Stop();
        }
    }
}

Performance:

using System;
using Infinispan.HotRod;
using Infinispan.HotRod.Config;

namespace InfinispanPerformance
{
    public class Program
    {
        private const int N = 10000;
        private static void Test(IRemoteCache<string,int?> cache)
        {
            // put speed
            DateTime dt1 = DateTime.Now;
            for (int i = 0; i < N; i++)
            {
                cache.Put("K#" + (i + 1), i + 1);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} puts per second", N * 1000 / (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int REP = 5;
        public static void Main(string[] args)
        {
            ConfigurationBuilder b = new ConfigurationBuilder();
            b.AddServer().Host("localhost").Port(11222);
            RemoteCacheManager rcm = new RemoteCacheManager(b.Build());
            IRemoteCache<string,int?> cache = rcm.GetCache<string,int?>("performance_cache");
            for (int i = 0; i < REP; i++)
            {
                Test(cache);
            }
            rcm.Stop();
        }
    }
}
11001 puts per second
13003 puts per second
13550 puts per second
13568 puts per second
13623 puts per second

Concurrency:

using System;
using System.Collections.Generic;
using System.Threading;

using Infinispan.HotRod;
using Infinispan.HotRod.Config;
using Infinispan.HotRod.Event;

namespace Cache.Infinispan.Functional
{
     public class Program
     {
        private delegate void F(IRemoteCache<string,int?> cache);
        private static void Test1(IRemoteCache<string,int?> cache)
        {
            int? hot;
            lock (cache)  // RemoteCache is not threadsafe
            {
                hot = cache.Get("hot");
            }
            hot = hot + 1;
            lock (cache)  // RemoteCache is not threadsafe
            {
                cache.Put("hot", hot);
            }
        }
        private static void Testn(IRemoteCache<string,int?> cache, int nrep, F action)
        {
            for (int i = 0; i < nrep; i++)
            {
                action(cache);
            }
        }
        private const int NTHREADS = 100;
        private const int NOPS = 10000;
        private static void Test(IRemoteCache<string,int?> cache, string lbl, F action)
        {
            DateTime dt1 = DateTime.Now;
            cache.Put("hot", 0);
            Thread[] t = new Thread[NTHREADS];
            for (int i = 0; i < NTHREADS; i++)
            {
                t[i] = new Thread(() => Testn(cache, NOPS / NTHREADS, action));
            }
            for (int i = 0; i < NTHREADS; i++)
            {
                t[i].Start();
            }
            for (int i = 0; i < NTHREADS; i++)
            {
                t[i].Join();
            }
            int? hot = cache.Get("hot");
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} : {1} = {2} ({3} ms)", lbl, hot, NOPS, (int)((dt2 - dt1).TotalMilliseconds));
        }
        public static void Main(string[] args)
        {
            ConfigurationBuilder b = new ConfigurationBuilder();
            b.AddServer().Host("localhost").Port(11222);
            RemoteCacheManager rcm = new RemoteCacheManager(b.Build());
            IRemoteCache<string,int?> cache = rcm.GetCache<string,int?>("concurrency_cache");
            Test(cache, "No handling", c => Test1(c));
            // nothing available as client library is not complete in 8.2.0 RC2
            rcm.Stop();
        }
    }
}
No handling : 2934 = 10000 (2461 ms)

Infinispan for .NET is simply not ready for serious usage.

Ignite

Name Apache Ignite
History Open source cache server first released in 2015
Platform Java (should in theory run on any OS with Java)
Server capabilities embedded cache (JVM languages only) and standalone cache server
cluster support
persistence support
Client languages client: Java and other JVM languages, C# and other .NET languages, C++
thin client: Java and other JVM languages, C# and other .NET languages, Python, PHP, C++ etc.
memcached protocol
Redis protocol
SQL via JDBC, ODBC or PDO drivers
RESTful HTTP API
Optional features list all
subscribe to changes
concurrency handling (locking, replace with check, atomic increment)
Namespace named caches
Other JCache API support
transaction support

Examples:

Functionality:

package cache.ignite;

import java.util.Collections;
import java.util.Iterator;

import javax.cache.Cache.Entry;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

public class TestFunctional {
    private static void test(IgniteCache<String,Integer> cache, IgniteEvents events) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all (old style)
        Iterator<Entry<String,Integer>> it = cache.iterator();
        while(it.hasNext()) {
            Entry<String,Integer> e = it.next();
            System.out.printf("%s = %d\n", e.getKey(), e.getValue());
        }
        // list all (new style)
        cache.forEach(e -> System.out.printf("%s = %d\n", e.getKey(), e.getValue()));
        // listener  does not work for me in 2.7.6
    }
    public static void main(String[] args) throws Exception {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(true);
        cfg.setIncludeEventTypes(EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_REMOVED);
        TcpDiscoveryVmIpFinder ipf = new TcpDiscoveryVmIpFinder();
        ipf.setAddresses(Collections.singletonList("localhost:47500"));
        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipf));
        Ignite cli = Ignition.start(cfg);
        IgniteCache<String,Integer> cache = cli.getOrCreateCache("functional_cache");
        IgniteEvents events = cli.events();
        test(cache, events);
        cli.close();
    }
}

Performance:

package cache.ignite;

import java.util.Collections;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(IgniteCache<String,Integer> cache) throws InterruptedException {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(true);
        TcpDiscoveryVmIpFinder ipf = new TcpDiscoveryVmIpFinder();
        ipf.setAddresses(Collections.singletonList("localhost:47500"));
        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipf));
        Ignite cli = Ignition.start(cfg);
        IgniteCache<String,Integer> cache = cli.getOrCreateCache("performance_cache");
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cli.close();
    }
}
4297 puts per second
6724 puts per second
7117 puts per second
7017 puts per second
7183 puts per second

Concurrency:

package cache.ignite;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private IgniteCache<String,Integer> cache;
        public Test1(IgniteCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test2 implements Runnable {
        private IgniteCache<String,Integer> cache;
        private Lock lck;
        public Test2(IgniteCache<String,Integer> cache, Lock lck) {
            this.cache = cache;
            this.lck = lck;
        }
        @Override
        public void run() {
            lck.lock();
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
            lck.unlock();
        }
    }
    private static class Test3 implements Runnable {
        private IgniteCache<String,Integer>cache;
        public Test3(IgniteCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static class Test4 implements Runnable {
        private IgniteAtomicLong counter;
        public Test4(IgniteAtomicLong counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            counter.addAndGet(1);
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(IgniteCache<String,Integer> cache, String lbl, Runnable action, IgniteAtomicLong counter) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        if(counter != null) {
            cache.put("hot", (int)counter.get());
        }
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        IgniteConfiguration cfg = new IgniteConfiguration();
        cfg.setClientMode(true);
        TcpDiscoveryVmIpFinder ipf = new TcpDiscoveryVmIpFinder();
        ipf.setAddresses(Collections.singletonList("localhost:47500"));
        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipf));
        Ignite cli = Ignition.start(cfg);
        CacheConfiguration<String,Integer> cc = new CacheConfiguration<String,Integer>();
        cc.setName("concurrency_cache");
        cc.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); // required for locking to work
        IgniteCache<String,Integer> cache = cli.getOrCreateCache(cc);
        Lock lck = cache.lock("hot");
        IgniteAtomicLong counter = cli.atomicLong("superhot", 0, true);
        test(cache, "No handling", new Test1(cache), null);
        test(cache, "Locking", new Test2(cache, lck), null);
        test(cache, "Replace with check", new Test3(cache), null);
        test(cache, "Atomic increment", new Test4(counter), counter);
        cli.close();
    }
}
No handling : 205 = 10000 (1573 ms)
Locking : 10000 = 10000 (6856 ms)
Replace with check : 10000 = 10000 (18311 ms)
Atomic increment : 20000 = 10000 (1077 ms)

Functionality:

package cache.ignitethin;

import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;

public class TestFunctional {
    private static void test(ClientCache<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all not supported using thin
        // listener not supported using thin
    }
    public static void main(String[] args) throws Exception {
        ClientConfiguration cfg = new ClientConfiguration().setAddresses("localhost:10800");
        IgniteClient cli = Ignition.startClient(cfg);
        ClientCache<String,Integer> cache = cli.getOrCreateCache("functional_cache");
        test(cache);
        cli.close();
    }
}

Performance:

package cache.ignitethin;

import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(ClientCache<String,Integer> cache) throws InterruptedException {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        ClientConfiguration cfg = new ClientConfiguration().setAddresses("localhost:10800");
        IgniteClient cli = Ignition.startClient(cfg);
        ClientCache<String,Integer> cache = cli.getOrCreateCache("performance_cache");
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cli.close();
    }
}
10080 puts per second
11135 puts per second
11111 puts per second
11160 puts per second
11312 puts per second

Concurrency:

package cache.ignitethin;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private ClientCache<String,Integer> cache;
        public Test1(ClientCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test3 implements Runnable {
        private ClientCache<String,Integer>cache;
        public Test3(ClientCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(ClientCache<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        ClientConfiguration cfg = new ClientConfiguration().setAddresses("localhost:10800");
        IgniteClient cli = Ignition.startClient(cfg);
        ClientCache<String,Integer> cache = cli.getOrCreateCache("concurrency_cache");
        test(cache, "No handling", new Test1(cache));
        // locking not supported using thin
        test(cache, "Replace with check", new Test3(cache));
        // atomic increment not supported using thin
        cli.close();
    }
}
No handling : 8400 = 10000 (1983 ms)
Replace with check : 10000 = 10000 (2009 ms)

Ignite supports the standard Java cache API JCache defined in JSR 107 from 2014.

ignite.xml configuration file used:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>localhost:47500</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Functionality:

package cache.jcache;

import java.net.URI;
import java.util.Iterator;

import javax.cache.Cache;
import javax.cache.Cache.Entry;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.spi.CachingProvider;

public class TestFunctional {
    private static class MyListener implements CacheEntryCreatedListener<String,Integer>, CacheEntryRemovedListener<String,Integer> {
        @Override
        public void onCreated(Iterable<CacheEventEntry<? extends String, ? extends Integer>> itcee) throws CacheEntryListenerException {
            for(CacheEntryEvent<? extends String, ? extends Integer> cee : itcee) {
                System.out.printf("Add : %s = %d\n", cee.getKey(), cee.getValue());
            }
        }
        @Override
        public void onRemoved(Iterable<CacheEventEntry<? extends String, ? extends Integer>> itcee) throws CacheEntryListenerException {
            for(CacheEntryEvent<? extends String, ? extends Integer> cee : itcee) {
                System.out.printf("Remove : %s\n", cee.getKey());
            }
        }
    }
    private static void test(Cache<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all (old style)
        Iterator<Entry<String,Integer>> it = cache.iterator();
        while(it.hasNext()) {
            Entry<String,Integer> e = it.next();
            System.out.printf("%s = %d\n", e.getKey(), e.getValue());
        }
        // list all (new style)
        cache.forEach(e -> System.out.printf("%s = %d\n", e.getKey(), e.getValue()));
        // listener
        CacheEntryListenerConfiguration<String,Integer> celc = new MutableCacheEntryListenerConfiguration<String,Integer>(MyListener::new, null, false, true);
        cache.registerCacheEntryListener(celc);
        cache.put("x", 123);
        cache.remove("x");
        Thread.sleep(10); // ensure sufficient time to call listener 
    }
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("org.apache.ignite.cache.CachingProvider");
        CacheManager cm = cp.getCacheManager(new URI("file:/C:/Work/ignite.xml"), null, null);
        Cache<String,Integer> cache = cm.getCache("functional_cache");
        if(cache == null) {
            cache = cm.createCache("functional_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache);
        cache.close();
        cm.close();
        cp.close();
    }
}

Performance:

package cache.jcache;

import java.net.URI;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Cache<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("org.apache.ignite.cache.CachingProvider");
        CacheManager cm = cp.getCacheManager(new URI("file:/C:/Work/ignite.xml"), null, null);
        Cache<String,Integer> cache = cm.getCache("performance_cache");
        if(cache == null) {
            cache = cm.createCache("performance_cache", new MutableConfiguration<String,Integer>());
        }
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cache.close();
        cm.close();
        cp.close();
    }
}
4130 puts per second
10976 puts per second
15060 puts per second
14970 puts per second
15503 puts per second

Concurrency:

JCache API only supports the replace with check paradigm.

package cache.jcache;

import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private Cache<String,Integer> cache;
        public Test1(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test3 implements Runnable {
        private Cache<String,Integer> cache;
        public Test3(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(Cache<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        CachingProvider cp = Caching.getCachingProvider("org.apache.ignite.cache.CachingProvider");
        CacheManager cm = cp.getCacheManager(new URI("file:/C:/Work/ignite.xml"), null, null);
        Cache<String,Integer> cache = cm.getCache("concurrency_cache");
        if(cache == null) {
            cache = cm.createCache("concurrency_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache, "No handling", new Test1(cache));
        test(cache, "Replace with check", new Test3(cache));
        cache.close();
        cm.close();
        cp.close();
    }
}
No handling : 103 = 10000 (3314 ms)
Replace with check : 10000 = 10000 (24079 ms)

Apache.Ignite is available via NuGet.

Functionality:

using System;
using System.Collections.Generic;

using Apache.Ignite.Core;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;

namespace Cache.IgniteThin.Functional
{
    public class Program
    {
        private static void Test(ICacheClient<string,int?> cache)
        {
            // basic put, get and remove
            cache.Put("v", 123);
            Console.WriteLine(cache.Get("v"));
            cache.Remove("v");
            Console.WriteLine(cache.ContainsKey("v") ? cache.Get("v") : null); // non-existing key throws exception - does not return null
            cache.Put("v", 123);
            Console.WriteLine(cache.Get("v"));
            // persistence
            int? counter = cache.ContainsKey("counter") ? cache.Get("counter") : 0; // non-existing key throws exception - does not return null
            counter++;
            Console.WriteLine(counter);
            cache.Put("counter", counter);
            // list all not supported using thin
            // listener not supported using thin
        }
        public static void Main(string[] args)
        {
            IgniteClientConfiguration cfg = new IgniteClientConfiguration { Endpoints = new List<string> { "localhost:10800" } };
            using (IIgniteClient cli = Ignition.StartClient(cfg))
            {
                ICacheClient<string,int?> cache = cli.GetOrCreateCache<string,int?>("functional_cache");    
                Test(cache);
            }
        }
    }
}

Performance:

using System;
using System.Collections.Generic;

using Apache.Ignite.Core;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;

namespace cache.IgniteThin.Performance
{
    public class Program
    {
        private const int N = 10000;
        private static void Test(ICacheClient<string,int?> cache)
        {
            // put speed
            DateTime dt1 = DateTime.Now;
            for(int i = 0; i < N; i++)
            {
                cache.Put("K#" + (i + 1), i + 1);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} puts per second", N * 1000 / (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int REP = 5;
        public static void Main(string[] args)
        {
            IgniteClientConfiguration cfg = new IgniteClientConfiguration { Endpoints = new List<string> { "localhost:10800" } };
            using (IIgniteClient cli = Ignition.StartClient(cfg))
            {
                ICacheClient<string,int?> cache = cli.GetOrCreateCache<string,int?>("performance_cache");   
                for(int i = 0; i < REP; i++)
                {
                    Test(cache);
                }
            }
        }
    }
}
10917 puts per second
11160 puts per second
11123 puts per second
11261 puts per second
11312 puts per second

Concurrency:

using System;
using System.Collections.Generic;
using System.Threading;

using Apache.Ignite.Core;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;

namespace Cache.IgniteThin.Concurrency
{
    public class Program
    {
        private delegate void F(ICacheClient<string,int?> cache);
        private static void Test1(ICacheClient<string,int?> cache) 
        {
            int? hot = cache.Get("hot");
            hot = hot + 1;
            cache.Put("hot", hot);
        }
        private static void Test3(ICacheClient<string,int?> cache) 
        {
            bool done = false;
            while(!done)
            {
                int? hot1 = cache.Get("hot");
                int? hot2 = hot1 + 1;
                done = cache.Replace("hot", hot1, hot2);
            }
        }
        private static void Testn(ICacheClient<string,int?> cache, int nrep, F action)
        {
            for(int i = 0; i < nrep; i++)
            {
                action(cache);
            }
        }
        private const int NTHREADS = 100;
        private const int NOPS = 10000;
        private static void Test(ICacheClient<string,int?> cache, string lbl, F action)
        {
            DateTime dt1 = DateTime.Now;
            cache.Put("hot", 0);
            Thread[] t = new Thread[NTHREADS];
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i] = new Thread(() => Testn(cache, NOPS/NTHREADS, action));
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Start();
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Join();
            }
            int? hot = cache.Get("hot");
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} : {1} = {2} ({3} ms)", lbl, hot, NOPS, (int)((dt2 - dt1).TotalMilliseconds));
        }
        public static void Main(string[] args)
        {
            IgniteClientConfiguration cfg = new IgniteClientConfiguration { Endpoints = new List<string> { "localhost:10800" } };
            using (IIgniteClient cli = Ignition.StartClient(cfg))
            {
                ICacheClient<string,int?> cache = cli.GetOrCreateCache<string,int?>("performance_cache");   
                Test(cache, "No handling", c => Test1(c));
                // locking not supported using thin
                Test(cache, "Replace with check", c => Test3(c));
                // atomic increment not supported using thin
            }
        }
    }
}
No handling : 294 = 10000 (14727 ms)
Replace with check : 10000 = 10000 (155667 ms)

Ignite PHP client is available from Github and can be installed with composer.

Functionality:

<?php
spl_autoload_register(function ($clznam) {
    include $clznam . '.php';
});

function test($cache) {
    // basic put, get and remove
    $cache->put('v', 123);
    echo $cache->get('v') . "\r\n";
    $cache->removeKey('v');
    echo $cache->get('v') . "\r\n";
    $cache->put('v', 123);
    echo $cache->get('v') . "\r\n";
    // persistence
    $counter = $cache->get('counter');
    if($counter == null) {
        $counter = 0;
    }
    $counter++;
    echo $counter . "\r\n";
    $cache->put('counter', $counter);
    // list all not supported using thin
    // listener not supported using thin
}
    
use Apache\Ignite\Client;
use Apache\Ignite\ClientConfiguration;
use Apache\Ignite\Type\ObjectType;

$cli = new Client();
$cli->connect(new ClientConfiguration('localhost:10800'));
$cache = $cli->getOrCreateCache('functional_cache')->setKeyType(ObjectType::STRING)->setValueType(ObjectType::INTEGER);
test($cache);
$cli->disconnect();
?>

Performance:

<?php
spl_autoload_register(function ($clznam) {
    include $clznam . '.php';
});

define('N', 10000);
function test($cache) {
    // put speed
    $t1 = gettimeofday(TRUE);
    for($i = 0; $i < N; $i++) {
        $cache->put('K#' . ($i + 1), $i + 1);
    }
    $t2 = gettimeofday(TRUE);
    echo sprintf("%d puts per second\r\n", (int)(N / ($t2 - $t1)));
}
    
use Apache\Ignite\Client;
use Apache\Ignite\ClientConfiguration;
use Apache\Ignite\Type\ObjectType;

define('REP', 5);
$cli = new Client();
$cli->connect(new ClientConfiguration('localhost:10800'));
$cache = $cli->getOrCreateCache('performance_cache')->setKeyType(ObjectType::STRING)->setValueType(ObjectType::INTEGER);
for($i = 0; $i < REP; $i++) {
    test($cache);
}
$cli->disconnect();
?>
5262 puts per second
5210 puts per second
5251 puts per second
5246 puts per second
5218 puts per second

Concurrency:

<?php
spl_autoload_register(function ($clznam) {
    include $clznam . '.php';
});

define('NTHREADS', 10); // 100 does not work
define('NOPS', 10000);

use pht\Thread;

use Apache\Ignite\Client;
use Apache\Ignite\ClientConfiguration;
use Apache\Ignite\Type\ObjectType;
    
function test1() {
    $cli = new Client();
    $cli->connect(new ClientConfiguration('localhost:10800'));
    $cache = $cli->getOrCreateCache('concurrency_cache')->setKeyType(ObjectType::STRING)->setValueType(ObjectType::INTEGER);
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $hot = $cache->get('hot');
        $hot = $hot + 1;
        $cache->put('hot', $hot);
    }
    $cli->disconnect();
}

function test3() {
    $cli = new Client();
    $cli->connect(new ClientConfiguration('localhost:10800'));
    $cache = $cli->getOrCreateCache('concurrency_cache')->setKeyType(ObjectType::STRING)->setValueType(ObjectType::INTEGER);
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $done = FALSE;
        while(!$done) {
            $hot1 = $cache->get('hot');
            $hot2 = $hot1 + 1;
            $done = $cache->replaceIfEquals('hot', $hot1, $hot2);
        }
    }
    $cli->disconnect();
}

function dispatch($me, $f) {
    $cmd = sprintf('%s -d include_path=%s %s %s', PHP_BINARY, get_include_path(), $me, $f);
    shell_exec($cmd);
}

function test($lbl, $me, $f) {
    $cli = new Client();
    $cli->connect(new ClientConfiguration('localhost:10800'));
    $cache = $cli->getOrCreateCache('concurrency_cache')->setKeyType(ObjectType::STRING)->setValueType(ObjectType::INTEGER);
    $t1 = gettimeofday(TRUE);
    $cache->put('hot', 0);
    $t = array();
    for($i = 0; $i < NTHREADS; $i++) {
        $t[] = new Thread();
        $t[$i]->addFunctionTask('dispatch', $me, $f);
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->start();        
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->join();
    }
    $hot = $cache->get('hot');
    $t2 = gettimeofday(TRUE);
    echo sprintf("%s : %d = %d (%d ms)\r\n", $lbl, $hot, NOPS, (int)(($t2 - $t1) * 1000));
    $cli->disconnect();
}

if(count($argv) < 2) {
    test('No handling', $argv[0], 'test1');
    // locking not supported using thin
    test('Replace with check', $argv[0],'test3');
    // atomic increment not supported using thin
} else {
    $argv[1]();
}
?>

The test code may look a little weird as it testing concurrency in a CLI app. Real usage would of course be in PHP pages served by web server.

No handling : 1846 = 10000 (1715 ms)
Replace with check : 10000 = 10000 (5845 ms)

pyignite can be installed via pip.

Functionality:

from pyignite import Client

cli = Client()
cli.connect('localhost', 10800)
cache = cli.get_or_create_cache('functional_cache')

# basic put, get and remove
cache.put('v', 123)
print(cache.get('v'))
cache.remove_keys('v')
print(cache.get('v'))
cache.put('v', 123)
print(cache.get('v'))

# persistence
counter = cache.get('counter')
if(counter == None):
    counter = 0;
counter = counter + 1
print(counter)                                                                       
cache.put('counter', counter)

# list all not supported using thin

# listener not supported using thin

cli.close()

Performance:

from pyignite import Client
import time

cli = Client()
cli.connect('localhost', 10800)
cache = cli.get_or_create_cache('performance_cache')

# put speed
N = 10000
for j in range(5):
    t1 = time.time()
    for i in range(N):
        cache.put('K#' + str(i + 1), i + 1);
    t2 = time.time()
    print('%d puts per second' % (N / (t2 - t1)))

cli.close()
5512 puts per second
5536 puts per second
5482 puts per second
5440 puts per second
5500 puts per second

Concurrency:

from pyignite import Client
import multiprocessing
import time

NTHREADS = 100
NOPS = 10000

def test1():
    cli = Client()
    cli.connect('localhost', 10800)
    cache = cli.get_or_create_cache('concurrency_cache')
    for i in range(NOPS//NTHREADS):
        hot = cache.get('hot')
        hot = hot + 1
        cache.put('hot', hot)
    cli.close()

def test3():
    cli = Client()
    cli.connect('localhost', 10800)
    cache = cli.get_or_create_cache('concurrency_cache')
    for i in range(NOPS//NTHREADS):
        done = False
        while not done:
            hot1 = cache.get('hot')
            hot2 = hot1 + 1
            done = cache.replace_if_equals('hot', hot1, hot2)
    cli.close()

def test(lbl, f):
    cli = Client()
    cli.connect('localhost', 10800)
    cache = cli.get_or_create_cache('concurrency_cache')
    t1 = time.time()
    cache.put('hot', 0)
    t = []
    for i in range(NTHREADS):
        t.append(multiprocessing.Process(target=f, args=()))
    for i in range(NTHREADS):
        t[i].start()
    for i in range(NTHREADS):
        t[i].join()
    hot = cache.get('hot')
    t2 = time.time()
    print('%s : %d = %d (%d ms)' % (lbl, hot, NOPS, (t2 - t1) * 1000))
    cli.close()

if __name__ == '__main__':
    test('No handling', test1);
    # locking not supported using thin
    test('Replace with check', test3);
    # atomic increment not supported using thin
No handling : 2482 = 10000 (5999 ms)
Replace with check : 10000 = 10000 (16664 ms)

Note that comparing speed of Python with other languages for this test is misleading as Python is implemented with multiple processes while the other languages are implemented with threads.

Ignite via SQL:

Ignite is a very feature rich product.

Among many features are the possibility of access via SQL.

So how does that work?

Tables are caches.

All primay key fields are mapped to key.

All non primary key fields are mapped to value.

SELECT is mapped to get function.

MERGE (which is a combined INSERT/UPDATE) is mapped to put function.

DELETE is mapped to remove function.

UPDATE is mapped to replace function.

Concurrency can be handled two ways:

SQL statements can be executed via two different API's:

In the examples I will use just two fields to avoid the mapping of multiple fields.

Examples:

Thin API exposes SQL API.

Functionality:

package cache.ignitesql;

import java.util.List;
import java.util.function.Consumer;

import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;

public class TestFunctional {
    private static Integer get(ClientCache<String,Integer> cache, String key) {
        List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT cvalue FROM functional_cache WHERE ckey = ?").setSchema("PUBLIC").setArgs(key)).getAll();
        if(res.size() > 0) {
            return (Integer)res.get(0).get(0);
        } else {
            return null;
        }
    }
    private static void put(ClientCache<String,Integer> cache, String key, Integer value) {
        cache.query(new SqlFieldsQuery("MERGE INTO functional_cache(ckey,cvalue) VALUES(?,?)").setSchema("PUBLIC").setArgs(key, value)).getAll();
    }
    private static void remove(ClientCache<String,Integer> cache, String key) {
        cache.query(new SqlFieldsQuery("DELETE FROM functional_cache WHERE ckey = ?").setSchema("PUBLIC").setArgs(key)).getAll();
    }
    private static void listall(ClientCache<String,Integer> cache, Consumer<List<?>> f) {
        cache.query(new SqlFieldsQuery("SELECT ckey,cvalue FROM functional_cache").setSchema("PUBLIC")).forEach(f);
    }
    private static void test(ClientCache<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        put(cache, "v", 123);
        System.out.println(get(cache, "v"));
        remove(cache, "v");
        System.out.println(get(cache, "v"));
        put(cache, "v", 123);
        System.out.println(get(cache, "v"));
        // persistence
        Integer counter = get(cache, "counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        put(cache, "counter", counter);
        // list all
        listall(cache, row -> { System.out.printf("%s = %d\n",  row.get(0), row.get(1)); });
        // listener not supported using thin
    }
    public static void main(String[] args) throws Exception {
        ClientConfiguration cfg = new ClientConfiguration().setAddresses("localhost:10800");
        IgniteClient cli = Ignition.startClient(cfg);
        ClientCache<String,Integer> cache = cli.getOrCreateCache(new ClientCacheConfiguration().setName("dummy"));
        cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS functional_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))").setSchema("PUBLIC")).getAll();
        test(cache);
        cli.close();
    }
}

Performance:

package cache.ignitesql;

import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;

public class TestPerformance {
    private static void put(ClientCache<String,Integer> cache, String key, Integer value) {
        cache.query(new SqlFieldsQuery("MERGE INTO performance_cache(ckey,cvalue) VALUES(?,?)").setSchema("PUBLIC").setArgs(key, value)).getAll();
    }
    private static final int N = 10000;
    private static void test(ClientCache<String,Integer> cache) throws InterruptedException {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            put(cache, "K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        ClientConfiguration cfg = new ClientConfiguration().setAddresses("localhost:10800");
        IgniteClient cli = Ignition.startClient(cfg);
        ClientCache<String,Integer> cache = cli.getOrCreateCache(new ClientCacheConfiguration().setName("dummy"));
        cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS performance_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))").setSchema("PUBLIC")).getAll();
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cli.close();
    }
}
4585 puts per second
8084 puts per second
8718 puts per second
9107 puts per second
9041 puts per second

Concurrency:

package cache.ignitesql;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;

public class TestConcurrency {
    private static Integer get(ClientCache<String,Integer> cache, String key) {
        List<List<?>> res = cache.query(new SqlFieldsQuery("SELECT cvalue FROM concurrency_cache WHERE ckey = ?").setSchema("PUBLIC").setArgs(key)).getAll();
        if(res.size() > 0) {
            return (Integer)res.get(0).get(0);
        } else {
            return null;
        }
    }
    private static void put(ClientCache<String,Integer> cache, String key, Integer value) {
        cache.query(new SqlFieldsQuery("MERGE INTO concurrency_cache(ckey,cvalue) VALUES(?,?)").setSchema("PUBLIC").setArgs(key, value)).getAll();
    }
    private static void add(ClientCache<String,Integer> cache, String key, Integer delta) {
        cache.query(new SqlFieldsQuery("UPDATE concurrency_cache SET cvalue = cvalue + ? WHERE ckey = ?").setSchema("PUBLIC").setArgs(delta, key)).getAll();
    }
    private static class Test1 implements Runnable {
        private ClientCache<String,Integer> cache;
        public Test1(ClientCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = get(cache, "hot");
            hot = hot + 1;
            put(cache, "hot", hot);
        }
    }
    private static class Test6 implements Runnable {
        private ClientCache<String,Integer>cache;
        public Test6(ClientCache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                try {
                    add(cache, "hot", 1);
                    done = true;
                } catch (Exception e) {
                    // retry
                }
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(ClientCache<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        put(cache, "hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = get(cache, "hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        ClientConfiguration cfg = new ClientConfiguration().setAddresses("localhost:10800");
        IgniteClient cli = Ignition.startClient(cfg);
        ClientCache<String,Integer> cache = cli.getOrCreateCache(new ClientCacheConfiguration().setName("dummy"));
        cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))").setSchema("PUBLIC")).getAll();
        test(cache, "No handling", new Test1(cache));
        // transaction with retry  not available
        test(cache, "Update with retry", new Test6(cache));
        cli.close();
    }
}
No handling : 4887 = 10000 (2706 ms)
Update with retry : 10000 = 10000 (1683 ms)

Ignite comes with a JDBC driver.

Functionality:

package cache.ignitejdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Consumer;
import java.sql.Statement;

public class TestFunctional {
    private static Integer get(Connection con, String key) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("SELECT cvalue FROM functional_cache WHERE ckey = ?");
        pstmt.setString(1, key);
        ResultSet rs = pstmt.executeQuery();
        Integer res = null;
        if(rs.next()) {
            res = rs.getInt(1);
        }
        rs.close();
        pstmt.close();
        return res;
    }
    private static void put(Connection con, String key, Integer value) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("MERGE INTO functional_cache(ckey,cvalue) VALUES(?,?)");
        pstmt.setString(1, key);
        pstmt.setInt(2, value);
        pstmt.executeUpdate();
        pstmt.close();
    }
    private static void remove(Connection con, String key) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("DELETE FROM functional_cache WHERE ckey = ?");
        pstmt.setString(1, key);
        pstmt.executeUpdate();
        pstmt.close();
    }
    private static void listall(Connection con, Consumer<ResultSet> f) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("SELECT ckey,cvalue FROM functional_cache");
        ResultSet rs = pstmt.executeQuery();
        while(rs.next()) {
            f.accept(rs);
        }
        rs.close();
        pstmt.close();
    }
    private static void test(Connection con) throws InterruptedException, SQLException {
        // basic put, get and remove
        put(con, "v", 123);
        System.out.println(get(con, "v"));
        remove(con, "v");
        System.out.println(get(con, "v"));
        put(con, "v", 123);
        System.out.println(get(con, "v"));
        // persistence
        Integer counter = get(con, "counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        put(con, "counter", counter);
        // list all
        listall(con, (ResultSet rs) -> { try { System.out.println(rs.getString(1) + " = " + rs.getInt(2)); } catch (SQLException e) { } } );
        // listener not supported using thin
    }
    public static void main(String[] args) throws Exception {
        Connection con = DriverManager.getConnection("jdbc:ignite:thin://localhost:10800");
        Statement stmt = con.createStatement();
        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS functional_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))");
        stmt.close();
        test(con);
        con.close();
    }
}

Performance:

package cache.ignitejdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;

public class TestPerformance {
    private static void put(Connection con, String key, Integer value) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("MERGE INTO performance_cache(ckey,cvalue) VALUES(?,?)");
        pstmt.setString(1, key);
        pstmt.setInt(2, value);
        pstmt.executeUpdate();
        pstmt.close();
    }
    private static final int N = 10000;
    private static void test(Connection con) throws InterruptedException, SQLException {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            put(con, "K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        Connection con = DriverManager.getConnection("jdbc:ignite:thin://localhost:10800");
        Statement stmt = con.createStatement();
        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS performance_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))");
        stmt.close();
        for(int i = 0; i < REP; i++) {
            test(con);
        }
        con.close();
    }
}
8203 puts per second
9267 puts per second
9302 puts per second
9259 puts per second
9000 puts per second

Concurrency:

package cache.ignitejdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnection;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
import org.apache.commons.dbcp2.PoolingDriver;
import org.apache.commons.pool2.impl.GenericObjectPool;

public class TestConcurrency {
    private static Integer get(Connection con, String key) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("SELECT cvalue FROM concurrency_cache WHERE ckey = ?");
        pstmt.setString(1, key);
        ResultSet rs = pstmt.executeQuery();
        Integer res = null;
        if(rs.next()) {
            res = rs.getInt(1);
        }
        rs.close();
        pstmt.close();
        return res;
    }
    private static void put(Connection con, String key, Integer value) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("MERGE INTO concurrency_cache(ckey,cvalue) VALUES(?,?)");
        pstmt.setString(1, key);
        pstmt.setInt(2, value);
        pstmt.executeUpdate();
        pstmt.close();
    }
    private static void add(Connection con, String key, Integer delta) throws SQLException {
        PreparedStatement pstmt = con.prepareStatement("UPDATE concurrency_cache SET cvalue = cvalue + ? WHERE ckey = ?");
        pstmt.setInt(1, delta);
        pstmt.setString(2, key);
        pstmt.executeUpdate();
        pstmt.close();
    }
    private static class Test1 implements Runnable {
        @Override
        public void run() {
            try {
                Connection con = DriverManager.getConnection("jdbc:apache:commons:dbcp:ignite");
                int hot = get(con, "hot");
                hot = hot + 1;
                put(con, "hot", hot);
                con.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    private static class Test6 implements Runnable {
        @Override
        public void run() {
            try {
                Connection con = DriverManager.getConnection("jdbc:apache:commons:dbcp:ignite");
                boolean done = false;
                while(!done) {
                    try {
                        add(con, "hot", 1);
                        done = true;
                    } catch (SQLException e) {
                        // retry
                    }
                }
                con.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(Connection con, String lbl, Runnable action) throws InterruptedException, SQLException {
        long t1 = System.currentTimeMillis();
        put(con, "hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = get(con, "hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        Class.forName("org.apache.commons.dbcp2.PoolingDriver");
        PoolingDriver driver = (PoolingDriver) DriverManager.getDriver("jdbc:apache:commons:dbcp:"); // Ignite Connection is not thread safe and connecting is slow so a connection pool is necessary
        PoolableConnectionFactory pcf = new PoolableConnectionFactory(new DriverManagerConnectionFactory("jdbc:ignite:thin://localhost:10800"), null);
        GenericObjectPool<PoolableConnection> cp = new GenericObjectPool<>(pcf);
        cp.setMinIdle(1);
        cp.setMaxTotal(NTHREADS + 1);
        pcf.setPool(cp);
        driver.registerPool("ignite", cp);
        Connection con = DriverManager.getConnection("jdbc:apache:commons:dbcp:ignite");
        Statement stmt = con.createStatement();
        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))");
        stmt.close();
        test(con, "No handling", new Test1());
        // transaction with retry does not work for my config
        test(con, "Update with retry", new Test6());
        con.close();
        driver.closePool("ignite");
    }
}
No handling : 174 = 10000 (1880 ms)
Update with retry : 10000 = 10000 (1559 ms)

Thin API exposes SQL API.

Functionality:

using System;
using System.Collections.Generic;

using Apache.Ignite.Core;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Cache.Query;

namespace Cache.IgniteSql.Functional
{
    public class Program
    {
        public delegate void Processor(IList<object> row);
        private static int? Get(ICacheClient<string,int> cache, String key)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("SELECT cvalue FROM functional_cache WHERE ckey = ?");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { key };
            IList<IList<object>> res = cache.Query(sfq).GetAll();
            if(res.Count > 0) {
                return (int?)res[0][0];
            } else {
                return null;
            }
        }
        private static void Put(ICacheClient<string,int> cache, String key, int? value)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("MERGE INTO functional_cache(ckey,cvalue) VALUES(?,?)");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { key, value };
            cache.Query(sfq).GetAll();
        }
        private static void Remove(ICacheClient<string,int> cache, String key)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("DELETE FROM functional_cache WHERE ckey = ?");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { key };
            cache.Query(sfq).GetAll();
        }
        private static void ListAll(ICacheClient<string,int> cache, Processor f)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("SELECT ckey,cvalue FROM functional_cache");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { };
            foreach(IList<object> row in cache.Query(sfq).GetAll())
            {
                f(row);
            }
        }
        private static void Test(ICacheClient<string,int> cache)
        {
            // basic put, get and remove
            Put(cache, "v", 123);
            Console.WriteLine(Get(cache, "v"));
            Remove(cache, "v");
            Console.WriteLine(Get(cache, "v"));
            Put(cache, "v", 123);
            Console.WriteLine(Get(cache, "v"));
            // persistence
            int? counter = Get(cache, "counter");
            if(counter == null)
            {
                counter = 0;
            }
            counter++;
            Console.WriteLine(counter);
            Put(cache, "counter", counter);
            // list all
            ListAll(cache, row => Console.WriteLine("{0} = {1}", row[0], row[1]));
            // listener not supported using thin
        }
        public static void Main(string[] args)
        {
            IgniteClientConfiguration cfg = new IgniteClientConfiguration { Endpoints = new List<string> { "localhost:10800" } };
            using (IIgniteClient cli = Ignition.StartClient(cfg))
            {
                ICacheClient<string,int> cache = cli.GetOrCreateCache<string,int>("functional_cache");  
                SqlFieldsQuery sfq = new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS functional_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))");
                sfq.Schema = "PUBLIC";
                cache.Query(sfq).GetAll();
                Test(cache);
            }
        }
    }
}

Performance:

using System;
using System.Collections.Generic;

using Apache.Ignite.Core;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Cache.Query;

namespace cache.IgniteSql.Performance
{
    public class Program
    {
        private static void Put(ICacheClient<string,int> cache, String key, int? value)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("MERGE INTO performance_cache(ckey,cvalue) VALUES(?,?)");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { key, value };
            cache.Query(sfq).GetAll();
        }
        private const int N = 10000;
        private static void Test(ICacheClient<string,int> cache)
        {
            // put speed
            DateTime dt1 = DateTime.Now;
            for(int i = 0; i < N; i++)
            {
                cache.Put("K#" + (i + 1), i + 1);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} puts per second", N * 1000 / (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int REP = 5;
        public static void Main(string[] args)
        {
            IgniteClientConfiguration cfg = new IgniteClientConfiguration { Endpoints = new List<string> { "localhost:10800" } };
            using (IIgniteClient cli = Ignition.StartClient(cfg))
            {
                ICacheClient<string,int> cache = cli.GetOrCreateCache<string,int>("performance_cache"); 
                SqlFieldsQuery sfq = new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS performance_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))");
                sfq.Schema = "PUBLIC";
                cache.Query(sfq).GetAll();
                for(int i = 0; i < REP; i++)
                {
                    Test(cache);
                }
            }
        }
    }
}
8474 puts per second
10834 puts per second
11454 puts per second
11467 puts per second
10917 puts per second

Concurrency:

using System;
using System.Collections.Generic;
using System.Threading;

using Apache.Ignite.Core;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Cache.Query;

namespace Cache.IgniteSql.Concurrency
{
    public class Program
    {
        private static int? Get(ICacheClient<string,int> cache, String key)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("SELECT cvalue FROM concurrency_cache WHERE ckey = ?");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { key };
            IList<IList<object>> res = cache.Query(sfq).GetAll();
            if(res.Count > 0) {
                return (int?)res[0][0];
            } else {
                return null;
            }
        }
        private static void Put(ICacheClient<string,int> cache, String key, int? value)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("MERGE INTO concurrency_cache(ckey,cvalue) VALUES(?,?)");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { key, value };
            cache.Query(sfq).GetAll();
        }
        private static void Add(ICacheClient<string,int> cache, String key, int? delta)
        {
            SqlFieldsQuery sfq = new SqlFieldsQuery("UPDATE concurrency_cache SET cvalue = cvalue + ? WHERE ckey = ?");
            sfq.Schema = "PUBLIC";
            sfq.Arguments = new object[] { delta, key };
            cache.Query(sfq).GetAll();
        }
        private delegate void F(ICacheClient<string,int> cache);
        private static void Test1(ICacheClient<string,int> cache) 
        {
            int? hot = Get(cache, "hot");
            hot = hot + 1;
            Put(cache, "hot", hot);
        }
        private static void Test6(ICacheClient<string,int> cache) 
        {
            bool done = false;
            while(!done)
            {
                try
                {
                    Add(cache, "hot", 1);
                    done = true;
                }
                catch(Exception)
                {
                    // retry
                }
            }
        }
        private static void Testn(ICacheClient<string,int> cache, int nrep, F action)
        {
            for(int i = 0; i < nrep; i++)
            {
                action(cache);
            }
        }
        private const int NTHREADS = 100;
        private const int NOPS = 10000;
        private static void Test(ICacheClient<string,int> cache, string lbl, F action)
        {
            DateTime dt1 = DateTime.Now;
            Put(cache, "hot", 0);
            Thread[] t = new Thread[NTHREADS];
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i] = new Thread(() => Testn(cache, NOPS/NTHREADS, action));
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Start();
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Join();
            }
            int? hot = Get(cache, "hot");
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} : {1} = {2} ({3} ms)", lbl, hot, NOPS, (int)((dt2 - dt1).TotalMilliseconds));
        }
        public static void Main(string[] args)
        {
            IgniteClientConfiguration cfg = new IgniteClientConfiguration { Endpoints = new List<string> { "localhost:10800" } };
            using (IIgniteClient cli = Ignition.StartClient(cfg))
            {
                ICacheClient<string,int> cache = cli.GetOrCreateCache<string,int>("concurrency_cache"); 
                SqlFieldsQuery sfq = new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))");
                sfq.Schema = "PUBLIC";
                cache.Query(sfq).GetAll();
                Test(cache, "No handling", c => Test1(c));
                // transaction with retry  not available
                Test(cache, "Update with retry", c => Test6(c));
            }
        }
    }
}
System.Net.Sockets.SocketException: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond

I think the code is correct. There is just something in my setup that can't support this test.

Ignites comes with an ODBC driver. And ADO.NET supports ODBC.

Functionality:

using System;
using System.Collections.Generic;
using System.Data.Odbc;

namespace Cache.IgniteOdbc.Functional
{
    public class Program
    {
        public delegate void Processor(OdbcDataReader rdr);
        private static int? Get(OdbcConnection con, String key)
        {
            using(OdbcCommand cmd = new OdbcCommand("SELECT cvalue FROM functional_cache WHERE ckey = ?", con))
            {
                cmd.Parameters.Add(new OdbcParameter(":key", OdbcType.VarChar, 16));
                cmd.Parameters[":key"].Value = key;
                using(OdbcDataReader rdr = cmd.ExecuteReader())
                {
                    if(rdr.Read())
                    {
                        return (int?)rdr[0];
                    }
                    else
                    {
                        return null;
                    }
                }
            }
        }
        private static void Put(OdbcConnection con, String key, int? value)
        {
            using(OdbcCommand cmd = new OdbcCommand("MERGE INTO functional_cache(ckey,cvalue) VALUES(?,?)", con))
            {
                cmd.Parameters.Add(new OdbcParameter(":key", OdbcType.VarChar, 16));
                cmd.Parameters.Add(new OdbcParameter(":value", OdbcType.Int));
                cmd.Parameters[":key"].Value = key;
                cmd.Parameters[":value"].Value = value;
                cmd.ExecuteNonQuery();
            }
        }
        private static void Remove(OdbcConnection con, String key)
        {
            using(OdbcCommand cmd = new OdbcCommand("DELETE FROM functional_cache WHERE ckey = ?", con))
            {
                cmd.Parameters.Add(new OdbcParameter(":key", OdbcType.VarChar, 16));
                cmd.Parameters[":key"].Value = key;
                cmd.ExecuteNonQuery();
            }
        }
        private static void ListAll(OdbcConnection con, Processor f)
        {
            using(OdbcCommand cmd = new OdbcCommand("SELECT ckey,cvalue FROM functional_cache", con))
            {
                using(OdbcDataReader rdr = cmd.ExecuteReader())
                {
                    while(rdr.Read())
                    {
                        f(rdr);
                    }
                }
                
            }
        }
        private static void Test(OdbcConnection con)
        {
            // basic put, get and remove
            Put(con, "v", 123);
            Console.WriteLine(Get(con, "v"));
            Remove(con, "v");
            Console.WriteLine(Get(con, "v"));
            Put(con, "v", 123);
            Console.WriteLine(Get(con, "v"));
            // persistence
            int? counter = Get(con, "counter");
            if(counter == null)
            {
                counter = 0;
            }
            counter++;
            Console.WriteLine(counter);
            Put(con, "counter", counter);
            // list all
            ListAll(con, rdr => Console.WriteLine("{0} = {1}", rdr[0], rdr[1]));
            // listener not supported using thin
        }
        public static void Main(string[] args)
        {
            using (OdbcConnection con = new OdbcConnection("Dsn=ARNEPC4_IGNITE"))
            {
                con.Open();
                using(OdbcCommand cmd = new OdbcCommand("CREATE TABLE IF NOT EXISTS functional_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))", con))
                {
                    cmd.ExecuteNonQuery();
                }
                Test(con);
            }
        }
    }
}

Performance:

using System;
using System.Collections.Generic;
using System.Data.Odbc;

namespace cache.IgniteOdbc.Performance
{
    public class Program
    {
        private static void Put(OdbcConnection con, String key, int? value)
        {
            using(OdbcCommand cmd = new OdbcCommand("MERGE INTO performance_cache(ckey,cvalue) VALUES(?,?)", con))
            {
                cmd.Parameters.Add(new OdbcParameter(":key", OdbcType.VarChar, 16));
                cmd.Parameters.Add(new OdbcParameter(":value", OdbcType.Int));
                cmd.Parameters[":key"].Value = key;
                cmd.Parameters[":value"].Value = value;
                cmd.ExecuteNonQuery();
            }
        }
        private const int N = 10000;
        private static void Test(OdbcConnection con)
        {
            // put speed
            DateTime dt1 = DateTime.Now;
            for(int i = 0; i < N; i++)
            {
                Put(con, "K#" + (i + 1), i + 1);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} puts per second", N * 1000 / (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int REP = 5;
        public static void Main(string[] args)
        {
            using (OdbcConnection con = new OdbcConnection("Dsn=ARNEPC4_IGNITE"))
            {
                con.Open();
                using(OdbcCommand cmd = new OdbcCommand("CREATE TABLE IF NOT EXISTS performance_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))", con))
                {
                    cmd.ExecuteNonQuery();
                }
                for(int i = 0; i < REP; i++)
                {
                    Test(con);
                }
            }
        }
    }
}
3947 puts per second
3838 puts per second
3924 puts per second
4133 puts per second
4030 puts per second

Concurrency:

using System;
using System.Threading;
using System.Data.Odbc;

namespace Cache.IgniteOdbc.Concurrency
{
    public class Program
    {
        private static int? Get(OdbcConnection con, String key)
        {
            using(OdbcCommand cmd = new OdbcCommand("SELECT cvalue FROM concurrency_cache WHERE ckey = ?", con))
            {
                cmd.Parameters.Add(new OdbcParameter(":key", OdbcType.VarChar, 16));
                cmd.Parameters[":key"].Value = key;
                using(OdbcDataReader rdr = cmd.ExecuteReader())
                {
                    if(rdr.Read())
                    {
                        return (int?)rdr[0];
                    }
                    else
                    {
                        return null;
                    }
                }
            }
        }
        private static void Put(OdbcConnection con, String key, int? value)
        {
            using(OdbcCommand cmd = new OdbcCommand("MERGE INTO concurrency_cache(ckey,cvalue) VALUES(?,?)", con))
            {
                cmd.Parameters.Add(new OdbcParameter(":key", OdbcType.VarChar, 16));
                cmd.Parameters.Add(new OdbcParameter(":value", OdbcType.Int));
                cmd.Parameters[":key"].Value = key;
                cmd.Parameters[":value"].Value = value;
                cmd.ExecuteNonQuery();
            }
        }
        private static void Add(OdbcConnection con, String key, int? delta)
        {
            using(OdbcCommand cmd = new OdbcCommand("UPDATE concurrency_cache SET cvalue = CVALUE + ? WHERE ckey = ?", con))
            {
                cmd.Parameters.Add(new OdbcParameter(":value", OdbcType.Int));
                cmd.Parameters.Add(new OdbcParameter(":key", OdbcType.VarChar, 16));
                cmd.Parameters[":value"].Value = delta;
                cmd.Parameters[":key"].Value = key;
                cmd.ExecuteNonQuery();
            }
        }
        private delegate void F(OdbcConnection con);
        private static void Test1(OdbcConnection con) 
        {
            int? hot = Get(con, "hot");
            hot = hot + 1;
            Put(con, "hot", hot);
        }
        private static void Test6(OdbcConnection con) 
        {
            bool done = false;
            while(!done)
            {
                try
                {
                    Add(con, "hot", 1);
                    done = true;
                }
                catch
                {
                    // retry
                }
            }
        }
        private static void Testn(int nrep, F action)
        {
            using (OdbcConnection con = new OdbcConnection("Dsn=ARNEPC4_IGNITE"))
            {
                con.Open();
                for(int i = 0; i < nrep; i++)
                {
                    action(con);
                }
            }
        }
        private const int NTHREADS = 100;
        private const int NOPS = 10000;
        private static void Test(OdbcConnection con, string lbl, F action)
        {
            DateTime dt1 = DateTime.Now;
            Put(con, "hot", 0);
            Thread[] t = new Thread[NTHREADS];
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i] = new Thread(() => Testn(NOPS/NTHREADS, action));
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Start();
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Join();
            }
            int? hot = Get(con, "hot");
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} : {1} = {2} ({3} ms)", lbl, hot, NOPS, (int)((dt2 - dt1).TotalMilliseconds));
        }
        public static void Main(string[] args)
        {
            using (OdbcConnection con = new OdbcConnection("Dsn=ARNEPC4_IGNITE"))
            {
                con.Open();
                using(OdbcCommand cmd = new OdbcCommand("CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))", con))
                {
                    cmd.ExecuteNonQuery();
                }
                Test(con, "No handling", c => Test1(c));
                // transaction with retry does not work for my config
                Test(con, "Update with retry", c => Test6(c));
            }
        }
    }
}
No handling : 224 = 10000 (2756 ms)
Update with retry : 10000 = 10000 (4317 ms)

Thin API exposes SQL API.

Functionality:

<?php
spl_autoload_register(function ($clznam) {
    include $clznam . '.php';
});

use Apache\Ignite\Client;
use Apache\Ignite\ClientConfiguration;
use Apache\Ignite\Query\SqlFieldsQuery;
use Apache\Ignite\Cache\CacheConfiguration;
    
function get($cache, $key) {
    $res = $cache->query((new SqlFieldsQuery('SELECT cvalue FROM functional_cache WHERE ckey = ?'))->setSchema('PUBLIC')->setArgs($key))->getAll();
    if(count($res) > 0) {
        return $res[0][0];
    } else {
        return null;
    }
}

function put($cache, $key, $value) {
    $cache->query((new SqlFieldsQuery('MERGE INTO functional_cache(ckey,cvalue) VALUES(?,?)'))->setSchema('PUBLIC')->setArgs($key, $value))->getAll();
}

function remove($cache, $key) {
    $cache->query((new SqlFieldsQuery('DELETE FROM functional_cache WHERE ckey = ?'))->setSchema('PUBLIC')->setArgs($key))->getAll();
}

function listall($cache, $f) {
    $res = $cache->query((new SqlFieldsQuery('SELECT ckey,cvalue FROM functional_cache'))->setSchema('PUBLIC'));
    foreach($res as $row) {
        $f($row);
    }
}

function test($cache) {
    // basic put, get and remove
    put($cache, 'v', 123);
    echo get($cache, 'v') . "\r\n";
    remove($cache, 'v');
    echo get($cache, 'v') . "\r\n";
    put($cache, 'v', 123);
    echo get($cache, 'v') . "\r\n";
    // persistence
    $counter = get($cache, 'counter');
    if($counter == null) {
        $counter = 0;
    }
    $counter++;
    echo $counter . "\r\n";
    put($cache, 'counter', $counter);
    // list all
    listall($cache, function($row) { echo $row[0] . ' = ' . $row[1] . "\r\n"; });
    // listener not supported
}
    
$cli = new Client();
$cli->connect(new ClientConfiguration('localhost:10800'));
$cache = $cli->getOrCreateCache('dummy', (new CacheConfiguration()));
$cache->query((new SqlFieldsQuery('CREATE TABLE IF NOT EXISTS functional_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))'))->setSchema('PUBLIC'))->getAll();
test($cache);
$cli->disconnect();
?>

Performance:

<?php
spl_autoload_register(function ($clznam) {
    include $clznam . '.php';
});

use Apache\Ignite\Client;
use Apache\Ignite\ClientConfiguration;
use Apache\Ignite\Query\SqlFieldsQuery;
use Apache\Ignite\Cache\CacheConfiguration;

function put($cache, $key, $value) {
    $cache->query((new SqlFieldsQuery('MERGE INTO performance_cache(ckey,cvalue) VALUES(?,?)'))->setSchema('PUBLIC')->setArgs($key, $value))->getAll();
}
    
define('N', 10000);
function test($cache) {
    // put speed
    $t1 = gettimeofday(TRUE);
    for($i = 0; $i < N; $i++) {
        put($cache, 'K#' . ($i + 1), $i + 1);
    }
    $t2 = gettimeofday(TRUE);
    echo sprintf("%d puts per second\r\n", (int)(N / ($t2 - $t1)));
}
    
define('REP', 5);
$cli = new Client();
$cli->connect(new ClientConfiguration('localhost:10800'));
$cache = $cli->getOrCreateCache('dummy', (new CacheConfiguration()));
$cache->query((new SqlFieldsQuery('CREATE TABLE IF NOT EXISTS performance_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))'))->setSchema('PUBLIC'))->getAll();
for($i = 0; $i < REP; $i++) {
    test($cache);
}
$cli->disconnect();
?>
2765 puts per second
2754 puts per second
2797 puts per second
2772 puts per second
2759 puts per second

Concurrency:

<?php
spl_autoload_register(function ($clznam) {
    include $clznam . '.php';
});

use pht\Thread;

use Apache\Ignite\Client;
use Apache\Ignite\ClientConfiguration;
use Apache\Ignite\Query\SqlFieldsQuery;
use Apache\Ignite\Cache\CacheConfiguration;

function get($cache, $key) {
    $res = $cache->query((new SqlFieldsQuery('SELECT cvalue FROM concurrency_cache WHERE ckey = ?'))->setSchema('PUBLIC')->setArgs($key))->getAll();
    if(count($res) > 0) {
        return $res[0][0];
    } else {
        return null;
    }
}

function put($cache, $key, $value) {
    $cache->query((new SqlFieldsQuery('MERGE INTO concurrency_cache(ckey,cvalue) VALUES(?,?)'))->setSchema('PUBLIC')->setArgs($key, $value))->getAll();
}

function add($cache, $key, $delta) {
    $cache->query((new SqlFieldsQuery('UPDATE concurrency_cache SET cvalue = cvalue + ? WHERE ckey = ?'))->setSchema('PUBLIC')->setArgs($delta, $key))->getAll();
}

define('NTHREADS', 10); // 100 does not work
define('NOPS', 10000);

function test1() {
    $cli = new Client();
    $cli->connect(new ClientConfiguration('localhost:10800'));
    $cache = $cli->getOrCreateCache('dummy', (new CacheConfiguration()));
    $cache->query((new SqlFieldsQuery('CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))'))->setSchema('PUBLIC'))->getAll();
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $hot = get($cache, 'hot');
        $hot = $hot + 1;
        put($cache, 'hot', $hot);
    }
    $cli->disconnect();
}

function test6() {
    $cli = new Client();
    $cli->connect(new ClientConfiguration('localhost:10800'));
    $cache = $cli->getOrCreateCache('dummy', (new CacheConfiguration()));
    $cache->query((new SqlFieldsQuery('CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))'))->setSchema('PUBLIC'))->getAll();
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $done = FALSE;
        while(!$done) {
            try {
                add($cache, 'hot', 1);
                $done = TRUE;
            } catch(Exception $ex) {
                // retry
            }
        }
    }
    $cli->disconnect();
}

function dispatch($me, $f) {
    $cmd = sprintf('%s -d include_path=%s %s %s', PHP_BINARY, get_include_path(), $me, $f);
    shell_exec($cmd);
}

function test($lbl, $me, $f) {
    $cli = new Client();
    $cli->connect(new ClientConfiguration('localhost:10800'));
    $cache = $cli->getOrCreateCache('dummy', (new CacheConfiguration()));
    $cache->query((new SqlFieldsQuery('CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))'))->setSchema('PUBLIC'))->getAll();
    $t1 = gettimeofday(TRUE);
    put($cache, 'hot', 0);
    $t = array();
    for($i = 0; $i < NTHREADS; $i++) {
        $t[] = new Thread();
        $t[$i]->addFunctionTask('dispatch', $me, $f);
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->start();        
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->join();
    }
    $hot = get($cache, 'hot');
    $t2 = gettimeofday(TRUE);
    echo sprintf("%s : %d = %d (%d ms)\r\n", $lbl, $hot, NOPS, (int)(($t2 - $t1) * 1000));
    $cli->disconnect();
}

if(count($argv) < 2) {
    test('No handling', $argv[0], 'test1');
    // transaction with retry  not available
    test('Update with retry', $argv[0], 'test6');
} else {
    $argv[1]();
}
?>
No handling : 2308 = 10000 (5172 ms)
Update with retry : 10000 = 10000 (2864 ms)

Ignites comes with an ODBC driver. And PDO supports ODBC.

Functionality:

<?php

function get($con, $key) {
    $stmt = $con->prepare('SELECT cvalue FROM functional_cache WHERE ckey = ?');
    $stmt->execute(array($key));
    if($row = $stmt->fetch()) {
        return $row[0];
    } else {
        return null;
    }
}

function put($con, $key, $value) {
    $stmt = $con->prepare('MERGE INTO functional_cache(ckey,cvalue) VALUES(?,?)');
    $stmt->execute(array($key, $value));
}

function remove($con, $key) {
    $stmt = $con->prepare('DELETE FROM functional_cache WHERE ckey = ?');
    $stmt->execute(array($key));
}

function listall($con, $f) {
    $stmt = $con->prepare('SELECT ckey,cvalue FROM functional_cache');
    $stmt->execute(array());
    while($row = $stmt->fetch()) {
        $f($row);
    }
}

function test($con) {
    // basic put, get and remove
    put($con, 'v', 123);
    echo get($con, 'v') . "\r\n";
    remove($con, 'v');
    echo get($con, 'v') . "\r\n";
    put($con, 'v', 123);
    echo get($con, 'v') . "\r\n";
    // persistence
    $counter = get($con, 'counter');
    if($counter == null) {
        $counter = 0;
    }
    $counter++;
    echo $counter . "\r\n";
    put($con, 'counter', $counter);
    // list all
    listall($con, function($row) { echo $row[0] . ' = ' . $row[1] . "\r\n"; });
    // listener not supported
}
    
$con = new PDO('odbc:ARNEPC4_IGNITE');
$con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$con->query('CREATE TABLE IF NOT EXISTS functional_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))');
test($con);
?>

Performance:

<?php
function put($con, $key, $value) {
    $stmt = $con->prepare('MERGE INTO performance_cache(ckey,cvalue) VALUES(?,?)');
    $stmt->execute(array($key, $value));
}

define('N', 10000);
function test($con) {
    // put speed
    $t1 = gettimeofday(TRUE);
    for($i = 0; $i < N; $i++) {
        put($con, 'K#' . ($i + 1), $i + 1);
    }
    $t2 = gettimeofday(TRUE);
    echo sprintf("%d puts per second\r\n", (int)(N / ($t2 - $t1)));
}
    
define('REP', 5);
$con = new PDO('odbc:ARNEPC4_IGNITE');
$con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$con->query('CREATE TABLE IF NOT EXISTS performance_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))');
for($i = 0; $i < REP; $i++) {
    test($con);
}
?>
7183 puts per second
7272 puts per second
7325 puts per second
7357 puts per second
7122 puts per second

Concurrency:

<?php

function get($con, $key) {
    $stmt = $con->prepare('SELECT cvalue FROM concurrency_cache WHERE ckey = ?');
    $stmt->execute(array($key));
    if($row = $stmt->fetch()) {
        return $row[0];
    } else {
        return null;
    }
}

function put($con, $key, $value) {
    $stmt = $con->prepare('MERGE INTO concurrency_cache(ckey,cvalue) VALUES(?,?)');
    $stmt->execute(array($key, $value));
}

function update($con, $key, $delta) {
    $stmt = $con->prepare('UPDATE concurrency_cache SET cvalue = cvalue + ? WHERE ckey = ?');
    $stmt->execute(array($delta, $key));
}

define('NTHREADS', 10); // 100 does not work
define('NOPS', 10000);

use pht\Thread;

function test1() {
    $con = new PDO('odbc:ARNEPC4_IGNITE');
    $con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $hot = get($con, 'hot');
        $hot = $hot + 1;
        put($con, 'hot', $hot);
    }
}

function test6() {
    $con = new PDO('odbc:ARNEPC4_IGNITE');
    $con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $done = FALSE;
        while(!$done) {
            try {
                update($con, 'hot', 1);
                $done = TRUE;
            } catch (PDOException $e) {
                // retry
            }
        }
    }
}

function dispatch($me, $f) {
    $cmd = sprintf('%s -d include_path=%s %s %s', PHP_BINARY, get_include_path(), $me, $f);
    shell_exec($cmd);
}

function test($lbl, $me, $f) {
    $con = new PDO('odbc:ARNEPC4_IGNITE');
    $con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    $con->query('CREATE TABLE IF NOT EXISTS concurrency_cache (ckey VARCHAR(16) NOT NULL, cvalue INTEGER NOT NULL, PRIMARY KEY(ckey))');
    $t1 = gettimeofday(TRUE);
    put($con, 'hot', 0);
    $t = array();
    for($i = 0; $i < NTHREADS; $i++) {
        $t[] = new Thread();
        $t[$i]->addFunctionTask('dispatch', $me, $f);
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->start();        
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->join();
    }
    $hot = get($con, 'hot');
    $t2 = gettimeofday(TRUE);
    echo sprintf("%s : %d = %d (%d ms)\r\n", $lbl, $hot, NOPS, (int)(($t2 - $t1) * 1000));
}

if(count($argv) < 2) {
    test('No handling', $argv[0], 'test1');
    // transaction with retry does not work for my config
    test('Update with retry', $argv[0],'test6'); // this does not work for me (server side exception does not trigger a client side exception)
} else {
    $argv[1]();
}
?>
No handling : 1623 = 10000 (2255 ms)
Update with retry : 9943 = 10000 (1350 ms)

Note the incorrect result.

memcached

Name memcached
History Open source cache server first released in 2003
Platform Linux, FreeBSD, macOS, Windows etc. (native C application)
Server capabilities standalone cache server
cluster support
Client languages PHP, Python, C#, Java, C, C++ etc.
Optional features concurrency handling (replace with check, atomic increment)
Namespace N/A
Other very high performance
available as managed service in AWS, Azure, GCP and Bluemix

Widely used including well known web sites like FaceBook, Twitter, Youtube and Wikipedia.

Examples:

Example will use Spy memcached library.

Functionality:

package cache.mencached;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.ConnectionFactoryBuilder.Protocol;
import net.spy.memcached.MemcachedClient;

public class TestFunctional {
    private static void test(MemcachedClient cache) throws InterruptedException {
        // basic put, get and remove
        cache.set("v", 0, 123);
        System.out.println(cache.get("v"));
        cache.delete("v");
        System.out.println(cache.get("v"));
        cache.set("v", 0, 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = (Integer)cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.set("counter", 0, counter);
        // list all not supported
        // listener not supported
    }
    public static void main(String[] args) throws Exception {
        MemcachedClient cache = new MemcachedClient(new ConnectionFactoryBuilder().setProtocol(Protocol.BINARY).build(), AddrUtil.getAddresses("localhost:11211"));
        test(cache);
        cache.shutdown();
    }
}

Performance:

package cache.mencached;

import java.util.concurrent.TimeUnit;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.ConnectionFactoryBuilder.Protocol;
import net.spy.memcached.MemcachedClient;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(MemcachedClient cache) throws InterruptedException {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.set("K#" + (i + 1), 0, i + 1);
        }
        cache.waitForQueues(100, TimeUnit.MILLISECONDS); // needed to sync
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        MemcachedClient cache = new MemcachedClient(new ConnectionFactoryBuilder().setProtocol(Protocol.BINARY).build(), AddrUtil.getAddresses("localhost:11211"));
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cache.shutdown();
    }
}
53475 puts per second
94339 puts per second
84033 puts per second
101010 puts per second
95238 puts per second

Concurrency:

package cache.mencached;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.ConnectionFactoryBuilder.Protocol;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private MemcachedClient cache;
        public Test1(MemcachedClient cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = (Integer)cache.get("hot");
            hot = hot + 1;
            cache.set("hot", 0, hot);
        }
    }
    private static class Test3 implements Runnable {
        private MemcachedClient cache;
        public Test3(MemcachedClient cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            while(true) {
                CASValue<Object> res = cache.gets("hot");
                int hot = (Integer)res.getValue();
                hot = hot + 1;
                if(cache.cas("hot", res.getCas(), hot) == CASResponse.OK) break;
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(MemcachedClient cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.set("hot", 0, 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = (Integer)cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        MemcachedClient cache = new MemcachedClient(new ConnectionFactoryBuilder().setProtocol(Protocol.BINARY).build(), AddrUtil.getAddresses("localhost:11211"));
        test(cache, "No handling", new Test1(cache));
        test(cache, "Replace with check", new Test3(cache));
        // atomic increment does not work in 2.12.3
        cache.shutdown();
    }
}
No handling : 96 = 10000 (370 ms)
Replace with check : 10000 = 10000 (7806 ms)

There is an upcoming Java EE standard for NoSQL databases: Jakarta NoSQL. The reference implementation is Eclipse JNoSQL.

The JNoSQL memcached driver uses Spy.

Functionality:

package cache.memcached.jnosql;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import jakarta.nosql.Settings;
import jakarta.nosql.Value;
import jakarta.nosql.keyvalue.BucketManager;
import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.memcached.keyvalue.MemcachedKeyValueConfiguration;

public class TestFunctional {
    private static void test(BucketManager cache) {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println((Integer)cache.get("v").get().get());
        cache.delete("v");
        Optional<Value> temp;
        temp = cache.get("v");
        System.out.println(temp.isPresent() ? (Integer)temp.get().get() : "(not exist)");
        cache.put("v", 123);
        System.out.println((Integer)cache.get("v").get().get());
        // persistence
        Integer counter;
        temp = cache.get("counter");
        if(temp.isPresent()) {
            counter = (Integer)temp.get().get();
        } else {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all not supported
        // listener not supported
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new MemcachedKeyValueConfiguration();
        Map<String, Object> props = new HashMap<>();
        props.put("memcached.host", "localhost:11211"); // necessary
        BucketManagerFactory bmf = cfg.get(Settings.of(props));
        BucketManager cache = bmf.getBucketManager("functional_cache"); // Map not supported
        test(cache);
        bmf.close();
    }
}

Performance:

package cache.memcached.jnosql;

import java.util.HashMap;
import java.util.Map;

import jakarta.nosql.Settings;
import jakarta.nosql.keyvalue.BucketManager;
import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.memcached.keyvalue.MemcachedKeyValueConfiguration;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(BucketManager cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new MemcachedKeyValueConfiguration();
        Map<String, Object> props = new HashMap<>();
        props.put("memcached.host", "localhost:11211"); // necessary
        BucketManagerFactory bmf = cfg.get(Settings.of(props));
        BucketManager cache = bmf.getBucketManager("functional_cache"); // Map not supported
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        bmf.close();
    }
}
68027 puts per second
109890 puts per second
434782 puts per second
416666 puts per second
192307 puts per second

Concurrency:

JNoSQL API does not currently support any of the paradigms.

package cache.memcached.jnosql;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import jakarta.nosql.Settings;
import jakarta.nosql.keyvalue.BucketManager;
import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.memcached.keyvalue.MemcachedKeyValueConfiguration;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private BucketManager cache;
        public Test1(BucketManager cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = (Integer)cache.get("hot").get().get();
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
   private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(BucketManager cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = (Integer)cache.get("hot").get().get();
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new MemcachedKeyValueConfiguration();
        Map<String, Object> props = new HashMap<>();
        props.put("memcached.host", "localhost:11211"); // necessary
        BucketManagerFactory bmf = cfg.get(Settings.of(props));
        BucketManager cache = bmf.getBucketManager("functional_cache"); // Map not supported
        test(cache, "No handling", new Test1(cache));
        bmf.close();
    }
}
No handling : 100 = 10000 (457 ms)

Example will use Enyim.Caching library available via NuGet.

Functionality:

using System;

using Enyim.Caching;
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached;

namespace Cache.Memcached.Functional
{
    public class Program
    {
        private static void Test(MemcachedClient cache)
        {
            // basic put, get and remove
            cache.Store(StoreMode.Set, "v", 123);
            Console.WriteLine(cache.Get<int?>("v"));
            cache.Remove("v");
            Console.WriteLine(cache.Get<int?>("v"));
            cache.Store(StoreMode.Set, "v", 123);
            Console.WriteLine(cache.Get<int?>("v"));
            // persistence
            int? counter = cache.Get<int?>("counter");
            if(counter == null)
            {
                counter = 0;
            }
            counter++;
            Console.WriteLine(counter);
            cache.Store(StoreMode.Set, "counter", counter);
            // list all not supported
            // listener not supported
        }
        public static void Main(string[] args)
        {
            MemcachedClientConfiguration cfg = new MemcachedClientConfiguration();
            cfg.Protocol = MemcachedProtocol.Binary;
            cfg.AddServer("localhost:11211");
            using(MemcachedClient cache = new MemcachedClient(cfg))
            {
                Test(cache);
            }
        }
    }
}

Performance:

using System;

using Enyim.Caching;
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached;

namespace Cache.Memcached.Performance
{
    public class Program
    {
        private const int N = 10000;
        private static void Test(MemcachedClient cache)
        {
            // put speed
            DateTime dt1 = DateTime.Now;
            for(int i = 0; i < N; i++)
            {
                cache.Store(StoreMode.Set, "K#" + (i + 1), i + 1);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} puts per second", N * 1000 / (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int REP = 5;
        public static void Main(string[] args)
        {
            MemcachedClientConfiguration cfg = new MemcachedClientConfiguration();
            cfg.Protocol = MemcachedProtocol.Binary;
            cfg.AddServer("localhost:11211");
            using(MemcachedClient cache = new MemcachedClient(cfg))
            {
                for(int i = 0; i < REP; i++)
                {
                    Test(cache);
                }
            }
        }
    }
}
15455 puts per second
16393 puts per second
16501 puts per second
15625 puts per second
15673 puts per second

Concurrency:

using System;
using System.Threading;

using Enyim.Caching;
using Enyim.Caching.Configuration;
using Enyim.Caching.Memcached;

namespace Cache.Memcached.Concurrency
{
    public class Program
    {
        private delegate void F(MemcachedClient cache);
        private static void Test1(MemcachedClient cache) 
        {
            int? hot = (int?)cache.Get("hot");
            hot = hot + 1;
            cache.Store(StoreMode.Set,"hot", hot);
        }
        private static void Test3(MemcachedClient cache) 
        {
            while(true)
            {
                CasResult<int?> res = cache.GetWithCas<int?>("hot");
                int? hot = res.Result;
                hot = hot + 1;
                if(cache.Cas(StoreMode.Replace, "hot", hot, res.Cas).Result) break;
            }
        }
        private static void Testn(MemcachedClient cache, int nrep, F action)
        {
            for(int i = 0; i < nrep; i++)
            {
                action(cache);
            }
        }
        private const int NTHREADS = 10;
        private const int NOPS = 10000;
        private static void Test(MemcachedClient cache, string lbl, F action)
        {
            DateTime dt1 = DateTime.Now;
            cache.Store(StoreMode.Set, "hot", 0);
            Thread[] t = new Thread[NTHREADS];
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i] = new Thread(() => Testn(cache, NOPS/NTHREADS, action));
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Start();
            }
            for(int i = 0; i < NTHREADS; i++)
            {
                t[i].Join();
            }
            int? hot = cache.Get<int?>("hot");
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} : {1} = {2} ({3} ms)", lbl, hot, NOPS, (int)((dt2 - dt1).TotalMilliseconds));
        }
        public static void Main(string[] args)
        {
            MemcachedClientConfiguration cfg = new MemcachedClientConfiguration();
            cfg.Protocol = MemcachedProtocol.Binary;
            cfg.AddServer("localhost:11211");
            using(MemcachedClient cache = new MemcachedClient(cfg))
            {
                Test(cache, "No handling", Test1);
                Test(cache, "Replace with check", Test3);
                // atomic increment does not work in 2.16
            }
        }
    }
}
No handling : 1710 = 10000 (533 ms)
Replace with check : 10000 = 10000 (1751 ms)

Example will use libmemcached library.

Functionality:

#include <stdio.h>

#ifdef WIN32
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <netdb.h>
#endif

#include <libmemcached/memcached.h>

static void test(memcached_st *cache)
{
    char *vkey = "v", *counterkey = "counter";
    int vval, vvallen, counterval, countervallen;
    char *p;
    memcached_return_t stat;
    /* basic put, get and remove */
    vval = 123;
    stat = memcached_set(cache, vkey, strlen(vkey), (char *)&vval, sizeof(vval), 0, 0);
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("set failed: %s\n", memcached_error(cache));
        exit(1);
    }
    p = memcached_get(cache, vkey, strlen(vkey), &vvallen, NULL, &stat); 
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("get failed: %s\n", memcached_error(cache));
        exit(1);
    }
    vval = *((int *)p);
    printf("%d\n", vval);
    stat = memcached_delete(cache, vkey, strlen(vkey), 0);
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("delete failed: %s\n", memcached_error(cache));
        exit(1);
    }
    p = memcached_get(cache, vkey, strlen(vkey), &vvallen, NULL, &stat); 
    if (stat != MEMCACHED_SUCCESS && stat != MEMCACHED_NOTFOUND)
    {
        printf("get failed: %s\n", memcached_error(cache));
        exit(1);
    }
    if(stat == MEMCACHED_SUCCESS)
    {
        vval = *((int *)p);
        printf("%d\n", vval);
    }
    if(stat == MEMCACHED_NOTFOUND)
    {
        printf("<not found>\n");
    }
    vval = 123;
    stat = memcached_set(cache, vkey, strlen(vkey), (char *)&vval, sizeof(vval), 0, 0);
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("set failed: %s\n", memcached_error(cache));
        exit(1);
    }
    p = memcached_get(cache, vkey, strlen(vkey), &vvallen, NULL, &stat); 
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("get failed: %s\n", memcached_error(cache));
        exit(1);
    }
    vval = *((int *)p);
    printf("%d\n", vval);
    /* persistence */
    p = memcached_get(cache, counterkey, strlen(counterkey), &countervallen, NULL, &stat); 
    if (stat != MEMCACHED_SUCCESS && stat != MEMCACHED_NOTFOUND)
    {
        printf("get failed: %s\n", memcached_error(cache));
        exit(1);
    }
    if(stat == MEMCACHED_SUCCESS)
    {
        counterval = *((int *)p);
    }
    if(stat == MEMCACHED_NOTFOUND)
    {
        counterval = 0;
    }
    counterval++;
    printf("%d\n", counterval);
    stat = memcached_set(cache, counterkey, strlen(counterkey), (char *)&counterval, sizeof(counterval), 0, 0);
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("set failed: %s\n", memcached_error(cache));
        exit(1);
    }
    /* list all not supported */
    /* listener not supported */
}

int main()
{
    const char *cfg = "--SERVER=localhost --BINARY-PROTOCOL";
    memcached_st *cache;
#ifdef WIN32
    WSADATA WSAData;
    WSAStartup(0x0101, &WSAData);
#endif
    cache = memcached(cfg, strlen(cfg));
    test(cache);
    memcached_free(cache);
#ifdef WIN32
    WSACleanup();
#endif
     return 0;
}

Performance:

#include <stdio.h>
#include <time.h>

#ifdef WIN32
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <netdb.h>
#endif

/* high precision time */
#ifdef WIN32
static inline double xtime()
{
    BOOL res;
    LARGE_INTEGER t, f;
    QueryPerformanceFrequency(&f);
    res = QueryPerformanceCounter(&t);
    return res ? (t.QuadPart * 1.0 / f.QuadPart) : 0;
}
#else
static inline double xtime()
{
    int res;
    struct timespec t;
    res = clock_gettime(CLOCK_MONOTONIC, &t);
    return (res == 0) ? (t.tv_sec + t.tv_nsec / 1000000000.0) : 0;
}
#endif

#include <libmemcached/memcached.h>

#define N 10000

static void test(memcached_st *cache)
{
    char key[10];
    int val;
    memcached_return_t stat;
    double t1, t2;
    int i;
    /* put speed */
    t1 = xtime();
    for(i = 0; i < N; i++)
    {
        sprintf(key, "K#%d", i + 1);
        val = i + 1;
        stat = memcached_set(cache, key, strlen(key), (char *)&val, sizeof(val), 0, 0);
        if (stat != MEMCACHED_SUCCESS)
        {
            printf("set failed: %s\n", memcached_error(cache));
            exit(1);
        }
    }
    t2 = xtime();
    printf("%d puts per second\n", (int)(N / (t2 - t1))); 
}

#define REP 5

int main()
{
    const char *cfg = "--SERVER=localhost --BINARY-PROTOCOL";
    memcached_st *cache;
    int i;
#ifdef WIN32
    WSADATA WSAData;
    WSAStartup(0x0101, &WSAData);
#endif
    cache = memcached(cfg, strlen(cfg));
    for(i = 0; i < REP; i++)
    {
        test(cache);
    }
    memcached_free(cache);
#ifdef WIN32
    WSACleanup();
#endif
     return 0;
}
30887 puts per second
31353 puts per second
31345 puts per second
31340 puts per second
31546 puts per second

Concurrency:

#include <stdio.h>

#ifdef WIN32
#include <winsock2.h>
#else
#include <sys/socket.h>
#include <netdb.h>
#endif

/* high precision time */
#ifdef WIN32
static inline double xtime()
{
    BOOL res;
    LARGE_INTEGER t, f;
    QueryPerformanceFrequency(&f);
    res = QueryPerformanceCounter(&t);
    return res ? (t.QuadPart * 1.0 / f.QuadPart) : 0;
}
#else
static inline double xtime()
{
    int res;
    struct timespec t;
    res = clock_gettime(CLOCK_MONOTONIC, &t);
    return (res == 0) ? (t.tv_sec + t.tv_nsec / 1000000000.0) : 0;
}
#endif

/* simple thread API */
#ifdef WIN32
#define T HANDLE
#define TSTART(t, f) t = CreateThread(NULL, 0, f, NULL, 0, NULL)
#define TWAIT(t) WaitForSingleObject(t, INFINITE); CloseHandle(t)
#define TFRETURN DWORD __stdcall
typedef DWORD (__stdcall *fptr)(void *data);
#else
#include <pthread.h>
#define T pthread_t
#define TSTART(t, f) pthread_create(&t, NULL, f, NULL)
#define TWAIT(t) pthread_join(t, NULL);
#define TFRETURN void *
typedef void *(*fptr)(void *data);
#endif

#include <libmemcached/memcached.h>

#define NOPS 10000
#define NTHREADS 10 /* 100 does not work on my Windows */

static TFRETURN test1(void *data)
{
    const char *cfg = "--SERVER=localhost --BINARY-PROTOCOL";
    char *key = "hot";
    long long int val;
    int vallen, i;
    char *p;
    memcached_st *cache;
    memcached_return_t stat;
    cache = memcached(cfg, strlen(cfg));
    for(i = 0; i < NOPS / NTHREADS; i++)
    {
        p = memcached_get(cache, key, strlen(key), &vallen, NULL, &stat);
        if (stat != MEMCACHED_SUCCESS)
        {
            printf("get failed: %s\n", memcached_error(cache));
            exit(1);
        }
        val = *((long long int *)p);
        val = val + 1;
        stat = memcached_set(cache, key, strlen(key), (char *)&val, sizeof(val), 0, 0);
        if (stat != MEMCACHED_SUCCESS)
        {
            printf("set failed: %s\n", memcached_error(cache));
            exit(1);
        }
    }
    memcached_free(cache);
    return 0;
}

static TFRETURN test4(void *data)
{
    const char *cfg = "--SERVER=localhost --BINARY-PROTOCOL";
    char *key = "hot";
    long long int val;
    int vallen, i;
    char *p;
    memcached_st *cache;
    memcached_return_t stat;
    cache = memcached(cfg, strlen(cfg));
    for(i = 0; i < NOPS / NTHREADS; i++)
    {
        stat = memcached_increment(cache, key, strlen(key), 1, &val); 
        if (stat != MEMCACHED_SUCCESS)
        {
            printf("increment failed: %s\n", memcached_error(cache));
            exit(1);
        }
    }
    memcached_free(cache);
    return 0;
}

static void test(memcached_st *cache, char *lbl, fptr f)
{
    char *key = "hot";
    long long int val;
    int vallen, i;
    char *p;
    double t1, t2;
    T t[NTHREADS];
    memcached_return_t stat;
    val = 0;
    stat = memcached_set(cache, key, strlen(key), (char *)&val, sizeof(val), 0, 0);
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("set failed: %s\n", memcached_error(cache));
        exit(1);
    }

    if(f == test4)
    {
        /* hack input */
        stat = memcached_delete(cache, key, strlen(key), 0);
        stat = memcached_increment_with_initial(cache, key, strlen(key), 1, 0, 0, &val);
        stat = memcached_decrement(cache, key, strlen(key), 1, &val); 
    }

    t1 = xtime();
    for(i = 0; i < NTHREADS; i++)
    {
        TSTART(t[i], f);
    }
    for(i = 0; i < NTHREADS; i++)
    {
        TWAIT(t[i]);
    }
    t2 = xtime();
    
    if(f == test4)
    {
        /* hack output */
        p = memcached_get(cache, key, strlen(key), &vallen, NULL, &stat); 
        val = atoi(p);
        stat = memcached_set(cache, key, strlen(key), (char *)&val, sizeof(val), 0, 0);
    }

    p = memcached_get(cache, key, strlen(key), &vallen, NULL, &stat); 
    if (stat != MEMCACHED_SUCCESS)
    {
        printf("get failed: %s\n", memcached_error(cache));
        exit(1);
    }
    val = *((long long int *)p);
    printf("%s : %lld = %d (%.3f ms)\n", lbl, val, NOPS, t2 - t1);
}

int main()
{
    const char *cfg = "--SERVER=localhost --BINARY-PROTOCOL";
    memcached_st *cache;
#ifdef WIN32
    WSADATA WSAData;
    WSAStartup(0x0101, &WSAData);
#endif
    cache = memcached(cfg, strlen(cfg));
    test(cache, "No handling", test1);
    test(cache, "Atomic increment", test4);
    memcached_free(cache);
#ifdef WIN32
    WSACleanup();
#endif
     return 0;
}
No handling : 2055 = 10000 (0.218 ms)
Atomic increment : 10000 = 10000 (0.088 ms)

Build Windows:

cl /DWIN32 /I%LIBMEMCACHED%\Include %1.c %LIBMEMCACHED%\Release\libmemcached.lib ws2_32.lib

There are two memcached extensions for PHP:

Example wil use memcache extension.

Functionality:

<?php
function test($cache) {
    // basic put, get and remove
    $cache->set('v', 123);
    echo $cache->get('v') . "\r\n";
    $cache->delete('v');
    echo $cache->get('v') . "\r\n";
    $cache->set('v', 123);
    echo $cache->get('v') . "\r\n";
    // persistence
    $counter = $cache->get('counter');
    if(!$counter) {
        $counter = 0;
    }
    $counter++;
    echo $counter . "\r\n";
    $cache->set('counter', $counter);
    // list all not supported
    // listener not supported
}
    
$cache = new Memcache();
$cache->connect('localhost', 11211) or die ("Could not connect to memcached");
test($cache);
$cache->close();
?>

Performance:

<?php
define('N', 10000);
function test($cache) {
    // put speed
    $t1 = gettimeofday(TRUE);
    for($i = 0; $i < N; $i++) {
        $cache->set('K#' . ($i + 1), $i + 1);
    }
    $t2 = gettimeofday(TRUE);
    echo sprintf("%d puts per second\r\n", (int)(N / ($t2 - $t1)));
}

define('REP', 5);
$cache = new Memcache();
$cache->connect('localhost', 11211) or die ("Could not connect to memcached");
for($i = 0; $i < REP; $i++) {
    test($cache);
}
$cache->close();
?>
23334 puts per second
23526 puts per second
20830 puts per second
23609 puts per second
23252 puts per second

Concurrency:

<?php
define('NTHREADS', 10); // 100 does not work
define('NOPS', 10000);

use pht\Thread;

function test1() {
    $cache = new Memcache();
    $cache->connect('localhost', 11211) or die ("Could not connect to memcached");
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $hot = $cache->get('hot');
        $hot = $hot + 1;
        $cache->set('hot', $hot);
    }
    $cache->close();
}

function test4() {
    $cache = new Memcache();
    $cache->connect('localhost', 11211) or die ("Could not connect to memcached");
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $cache->increment('hot');
    }
    $cache->close();
}

function dispatch($me, $f) {
    $cmd = sprintf('%s -d include_path=%s %s %s', PHP_BINARY, get_include_path(), $me, $f);
    shell_exec($cmd);
}

function test($lbl, $me, $f) {
    $cache = new Memcache();
    $cache->connect('localhost', 11211) or die ("Could not connect to memcached");
    $t1 = gettimeofday(TRUE);
    $cache->set('hot', 0);
    $t = array();
    for($i = 0; $i < NTHREADS; $i++) {
        $t[] = new Thread();
        $t[$i]->addFunctionTask('dispatch', $me, $f);
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->start();        
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->join();
    }
    $hot = $cache->get('hot');
    $t2 = gettimeofday(TRUE);
    echo sprintf("%s : %d = %d (%d ms)\r\n", $lbl, $hot, NOPS, (int)(($t2 - $t1) * 1000));
    $cache->close();
}

if(count($argv) < 2) {
    test('No handling', $argv[0], 'test1');
    test('Atomic increment', $argv[0], 'test4');
} else {
    $argv[1]();
}
?>

The test code may look a little weird as it testing concurrency in a CLI app. Real usage would of course be in PHP pages served by web server.

No handling : 2950 = 10000 (730 ms)
Atomic increment : 10000 = 10000 (504 ms)

Module pymemcache can be installed via pip.

Functionality:

from pymemcache.client.base import Client

cache = Client(('localhost', 11211))

# basic put, get and remove
cache.set("v", 123)
print(int(cache.get("v")))
cache.delete("v")
print(cache.get("v"))
cache.set("v", 123)
print(int(cache.get("v")))

# persistence
counter = cache.get("counter")
if(counter == None):
    counter = 0;
else:
    counter = int(counter)
counter = counter + 1
print(counter)
cache.set("counter", counter)

# list all not supported

# listener not supported

cache.close()

Performance:

from pymemcache.client.base import Client
import time

cache = Client(('localhost', 11211))

# put speed
N = 10000
for j in range(5):
    t1 = time.time()
    for i in range(N):
        cache.set("K#" + str(i + 1), i + 1);
    t2 = time.time()
    print("%d puts per second" % (N / (t2 - t1)))

cache.close()
41491 puts per second
42732 puts per second
42550 puts per second
42370 puts per second
42550 puts per second

Concurrency:

from pymemcache.client.base import Client
import multiprocessing
import time

NTHREADS = 10 # CAS hangs for 100
NOPS = 10000

def test1():
    cache = Client(('localhost', 11211))
    for i in range(NOPS//NTHREADS):
        hot = int(cache.get("hot"))
        hot = hot + 1
        cache.set("hot", hot)
    cache.close()

def test3():
    cache = Client(('localhost', 11211))
    for i in range(NOPS//NTHREADS):
        while True:
            hot,tok = cache.gets("hot")
            hot = int(hot)
            hot = hot + 1
            if cache.cas("hot", hot, tok):
                break
    cache.close()

def test4():
    cache = Client(('localhost', 11211))
    for i in range(NOPS//NTHREADS):
        cache.incr("hot", 1)
    cache.close()

def test(lbl, f):
    cache = Client(('localhost', 11211))
    t1 = time.time()
    cache.set("hot", 0)
    t = []
    for i in range(NTHREADS):
        t.append(multiprocessing.Process(target=f, args=()))
    for i in range(NTHREADS):
        t[i].start()
    for i in range(NTHREADS):
        t[i].join()
    hot = int(cache.get("hot"))
    t2 = time.time()
    print("%s : %d = %d (%d ms)" % (lbl, hot, NOPS, (t2 - t1) * 1000))
    cache.close()

if __name__ == '__main__':
    test("No handling", test1)
    test("Replace with check", test3)
    test("Atomic increment", test4)
No handling : 1852 = 10000 (624 ms)
Replace with check : 10000 = 10000 (1909 ms)
Atomic increment : 10000 = 10000 (402 ms)

Note that comparing speed of Python with other languages for this test is misleading as Python is implemented with multiple processes while the other languages are implemented with threads.

Redis

Name Redis
History Open source cache server first released in 2009
Platform Linux, FreeBSD/OpenBSD/NetBSD, macOS, Windows, Solaris etc. (native C application)
Server capabilities standalone cache server
cluster support
persistence support
Client languages PHP, Python, C#, Java, C, C++, Go, Perl, Ruby, Objective-C, Swift etc.
Optional features list all
concurrency handling (replace with check, atomic increment)
Namespace numbered caches
Other high performance
available as managed service in AWS and Azure

Examples:

There are multiple Redis client libraries available for Java. Including:

This example will use Jedis.

Jedis is a simple Redis library for Java exposing an API similar to Redis libraries for other languages.

Functionality:

package cache.redis;

import redis.clients.jedis.Jedis;

public class TestFunctional {
    private static void test(Jedis cache) {
        // basic put, get and remove
        cache.set("v", Integer.toString(123));
        System.out.println(cache.get("v"));
        cache.del("v");
        System.out.println(cache.get("v") != null ? Integer.parseInt(cache.get("v")) : null);
        cache.set("v", Integer.toString(123));
        System.out.println(cache.get("v"));
        // persistence
        int counter;
        if(cache.get("counter") == null) {
            counter = 0;
        } else {
            counter = Integer.parseInt(cache.get("counter"));
        }
        counter++;
        System.out.println(counter);
        cache.set("counter", Integer.toString(counter));
        // list all
        for(String key : cache.keys("*")) {
            System.out.printf("%s = %d\n", key, Integer.parseInt(cache.get(key)));
        }
        // listener not supported
    }
    private static final int FUNCTIONAL_CACHE = 0;
    public static void main(String[] args) throws Exception {
        Jedis cache = new Jedis("localhost", 6379);
        cache.select(FUNCTIONAL_CACHE);
        test(cache);
        cache.close();
    }
}

Performance:

package cache.redis;

import redis.clients.jedis.Jedis;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Jedis cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.set("K#" + (i + 1), Integer.toString(i + 1));
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int PERFORMANCE_CACHE = 1;
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        Jedis cache = new Jedis("localhost", 6379);
        cache.select(PERFORMANCE_CACHE);
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cache.close();
    }
}
24937 puts per second
30120 puts per second
29761 puts per second
30211 puts per second
29940 puts per second

Concurrency:

package cache.redis;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class TestConcurrency {
    private static final int CONCURRENCY_CACHE = 2;
    private static class Test1 implements Runnable {
        private JedisPool pool;
        public Test1(JedisPool pool) {
            this.pool = pool;
        }
        @Override
        public void run() { 
            Jedis cache = pool.getResource();
            cache.select(CONCURRENCY_CACHE);
            int hot = Integer.parseInt(cache.get("hot"));
            hot = hot + 1;
            cache.set("hot", Integer.toString(hot));
            cache.close();
        }
    }
    private static class Test4 implements Runnable {
        private JedisPool pool;
        public Test4(JedisPool pool) {
            this.pool = pool;
        }
        @Override
        public void run() {
            Jedis cache = pool.getResource();
            cache.select(CONCURRENCY_CACHE);
            cache.incrBy("hot",  1);
            cache.close();
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(JedisPool pool, String lbl, Runnable action) throws InterruptedException {
        Jedis cache = pool.getResource(); // Jedis object is not thread safe but JedisPool is
        cache.select(CONCURRENCY_CACHE);
        long t1 = System.currentTimeMillis();
        cache.set("hot", Integer.toString(0));
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = Integer.parseInt(cache.get("hot"));
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
        cache.close();
    }
    public static void main(String[] args) throws Exception {
        JedisPoolConfig cfg = new JedisPoolConfig();
        cfg.setMaxTotal(NTHREADS + 10);
        JedisPool pool = new JedisPool(cfg, "localhost", 6379);
        test(pool, "No handling", new Test1(pool));
        // replace with check did not work for me
        test(pool, "Atomic increment", new Test4(pool));
        pool.close();
    }
}
No handling : 468 = 10000 (781 ms)
Atomic increment : 10000 = 10000 (586 ms)

There are multiple Redis client libraries available for Java. Including:

This example will use Redisson.

Redisson is an advanced Redis library for Java exposing an API similar to other cache software libraries for Java.

In some ways Redisson is really a distinct cache software build on top of Redis.

Functionality:

package cache.redis;

import java.util.Map.Entry;

import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

public class TestFunctional {
    private static void test(RMap<String,Integer> cache) {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all
        for(Entry<String,Integer> en : cache.entrySet()) {
            System.out.printf("%s = %d\n", en.getKey(), en.getValue());
        }
        // listener does not seem to work as I expect in 3.13.6
    }
    public static void main(String[] args) throws Exception {
        ((Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN); // hack to avoid debug & info output
        Config cfg = new Config();
        //cfg.useClusterServers().addNodeAddress("redis://localhost:6379");
        cfg.useSingleServer().setAddress("redis://localhost:6379");
        RedissonClient client = Redisson.create(cfg);
        RMap<String,Integer> cache = client.getMap("functional_cache");
        test(cache);
        client.shutdown();
    }
}

Performance:

package cache.redis;

import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(RMap<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        ((Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN); // hack to avoid debug & info output
        Config cfg = new Config();
        //cfg.useClusterServers().addNodeAddress("redis://localhost:6379");
        cfg.useSingleServer().setAddress("redis://localhost:6379");
        RedissonClient client = Redisson.create(cfg);
        RMap<String,Integer> cache = client.getMap("performance_cache");
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        client.shutdown();
    }
}
5099 puts per second
8620 puts per second
9433 puts per second
9433 puts per second
9523 puts per second

Concurrency:

package cache.redis;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

import org.redisson.Redisson;
import org.redisson.api.RAtomicLong;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private RMap<String,Integer> cache;
        public Test1(RMap<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() { 
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test2 implements Runnable {
        private RMap<String,Integer> cache;
        private Lock lck;
        public Test2(RMap<String,Integer> cache, Lock lck) {
            this.cache = cache;
            this.lck = lck;
        }
        @Override
        public void run() { 
            lck.lock();
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
            lck.unlock();
        }
    }
    private static class Test3 implements Runnable {
        private RMap<String,Integer> cache;
        public Test3(RMap<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static class Test4 implements Runnable {
        private RAtomicLong counter;
        public Test4(RAtomicLong counter) {
            this.counter = counter;
        }
        @Override
        public void run() {
            counter.addAndGet(1);
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(RMap<String,Integer> cache, String lbl, Runnable action, RAtomicLong counter) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        if(counter != null) {
            cache.put("hot",  (int)counter.get());
        }
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        ((Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN); // hack to avoid debug & info output
        Config cfg = new Config();
        //cfg.useClusterServers().addNodeAddress("redis://localhost:6379");
        cfg.useSingleServer().setAddress("redis://localhost:6379");
        RedissonClient client = Redisson.create(cfg);
        RMap<String,Integer> cache = client.getMap("concurrency_cache");
        Lock lck = client.getFairLock("hot");
        RAtomicLong counter = client.getAtomicLong("superhot");
        counter.set(0);
        test(cache, "No handling", new Test1(cache), null);
        test(cache, "Locking", new Test2(cache, lck), null);
        test(cache, "Replace with check", new Test3(cache), null);
        test(cache, "Atomic increment", new Test4(counter), counter);
        client.shutdown();    }
}
No handling : 195 = 10000 (1488 ms)
Locking : 10000 = 10000 (17219 ms)
Replace with check : 10000 = 10000 (22353 ms)
Atomic increment : 10000 = 10000 (243 ms)

There are multiple Redis client libraries available for Java. Including:

This example will use Redisson.

Redisson is an advanced Redis library for Java exposing an API similar to other cache software libraries for Java.

In some ways Redisson is really a distinct cache software build on top of Redis.

Redisson supports the standard Java cache API JCache defined in JSR 107 from 2014.

redis.yaml configuration file used:

singleServerConfig:
  address: "redis://localhost:6379"

Functionality:

package cache.jcache;

import java.net.URI;
import java.util.Iterator;

import javax.cache.Cache;
import javax.cache.Cache.Entry;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.spi.CachingProvider;

import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

public class TestFunctional {
    private static class MyListener implements CacheEntryCreatedListener<String,Integer>, CacheEntryRemovedListener<String,Integer> {
        @Override
        public void onCreated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> itcee) throws CacheEntryListenerException {
            for(CacheEntryEvent<? extends String, ? extends Integer> cee : itcee) {
                System.out.printf("Add : %s = %d\n", cee.getKey(), cee.getValue());
            }
        }
        @Override
        public void onRemoved(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> itcee) throws CacheEntryListenerException {
            for(CacheEntryEvent<? extends String, ? extends Integer> cee : itcee) {
                System.out.printf("Remove : %s\n", cee.getKey());
            }
        }
    }
    private static void test(Cache<String,Integer> cache) throws InterruptedException {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all (old style)
        Iterator<Entry<String,Integer>> it = cache.iterator();
        while(it.hasNext()) {
            Entry<String,Integer> e = it.next();
            System.out.printf("%s = %d\n", e.getKey(), e.getValue());
        }
        // list all (new style)
        cache.forEach(e -> System.out.printf("%s = %d\n", e.getKey(), e.getValue()));
        // listener
        CacheEntryListenerConfiguration<String,Integer> celc = new MutableCacheEntryListenerConfiguration<String,Integer>(MyListener::new, null, false, true);
        cache.registerCacheEntryListener(celc);
        cache.put("x", 123);
        cache.remove("x");
        Thread.sleep(10); // ensure sufficient time to call listener 
    }
    public static void main(String[] args) throws Exception {
        ((Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN); // hack to avoid debug & info output
        CachingProvider cp = Caching.getCachingProvider("org.redisson.jcache.JCachingProvider");
        CacheManager cm = cp.getCacheManager(new URI("file:/C:/Work/redis.yaml"), null, null);
        Cache<String,Integer> cache = cm.getCache("functional_cache");
        if(cache == null) {
            cache = cm.createCache("functional_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache);
        cache.close();
        cm.close();
        cp.close();
    }
}

Performance:

package cache.jcache;

import java.net.URI;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Cache<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        ((Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN); // hack to avoid debug & info output
        CachingProvider cp = Caching.getCachingProvider("org.redisson.jcache.JCachingProvider");
        CacheManager cm = cp.getCacheManager(new URI("file:/C:/Work/redis.yaml"), null, null);
        Cache<String,Integer> cache = cm.getCache("performance_cache");
        if(cache == null) {
            cache = cm.createCache("performance_cache", new MutableConfiguration<String,Integer>());
        }
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        cache.close();
        cm.close();
        cp.close();
    }
}
4385 puts per second
6711 puts per second
6666 puts per second
6849 puts per second
6896 puts per second

Concurrency:

package cache.jcache;

import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private Cache<String,Integer> cache;
        public Test1(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static class Test3 implements Runnable {
        private Cache<String,Integer> cache;
        public Test3(Cache<String,Integer> cache) {
            this.cache = cache;
        }
        @Override
        public void run() {
            boolean done = false;
            while(!done) {
                int hot1 = cache.get("hot");
                int hot2 = hot1 + 1;
                done = cache.replace("hot", hot1, hot2);
            }
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(Cache<String,Integer> cache, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        ((Logger)LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN); // hack to avoid debug & info output
        CachingProvider cp = Caching.getCachingProvider("org.redisson.jcache.JCachingProvider");
        CacheManager cm = cp.getCacheManager(new URI("file:/C:/Work/redis.yaml"), null, null);
        Cache<String,Integer> cache = cm.getCache("concurrency_cache");
        if(cache == null) {
            cache = cm.createCache("concurrency_cache", new MutableConfiguration<String,Integer>());
        }
        test(cache, "No handling", new Test1(cache));
        test(cache, "Replace with check", new Test3(cache));
        cache.close();
        cm.close();
        cp.close();
    }
}
No handling : 210 = 10000 (1808 ms)
Replace with check : 10000 = 10000 (31470 ms)

There is an upcoming Java EE standard for NoSQL databases: Jakarta NoSQL. The reference implementation is Eclipse JNoSQL.

The Redis JNoSQL driver uses Jedis.

Functionality:

package cache.redis.jnosql;

import java.util.Map;

import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.redis.keyvalue.RedisConfiguration;

public class TestFunctional {
    private static void test(Map<String,Integer> cache) {
        // basic put, get and remove
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        cache.remove("v");
        System.out.println(cache.get("v"));
        cache.put("v", 123);
        System.out.println(cache.get("v"));
        // persistence
        Integer counter = cache.get("counter");
        if(counter == null) {
            counter = 0;
        }
        counter++;
        System.out.println(counter);
        cache.put("counter", counter);
        // list all not supported
        // listener not supported
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new RedisConfiguration();
        BucketManagerFactory bmf = cfg.get();
        Map<String,Integer> cache = bmf.getMap("functional_cache", String.class, Integer.class);
        test(cache);
        bmf.close();
    }
}

Performance:

package cache.redis.jnosql;

import java.util.Map;

import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.redis.keyvalue.RedisConfiguration;

public class TestPerformance {
    private static final int N = 10000;
    private static void test(Map<String,Integer> cache) {
        // put speed
        long t1 = System.currentTimeMillis();
        for(int i = 0; i < N; i++) {
            cache.put("K#" + (i + 1), i + 1);
        }
        long t2 = System.currentTimeMillis();
        System.out.printf("%d puts per second\n", N * 1000 / (t2 - t1));
    }
    private static final int REP = 5;
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new RedisConfiguration();
        BucketManagerFactory bmf = cfg.get();
        Map<String,Integer> cache = bmf.getMap("performance_cache", String.class, Integer.class);
        for(int i = 0; i < REP; i++) {
            test(cache);
        }
        bmf.close();
    }
}
19417 puts per second
30120 puts per second
30211 puts per second
28571 puts per second
30674 puts per second

Concurrency:

JNoSQL API does not currently support any of the paradigms.

package cache.redis.jnosql;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import jakarta.nosql.keyvalue.BucketManagerFactory;
import jakarta.nosql.keyvalue.KeyValueConfiguration;

import org.eclipse.jnosql.diana.redis.keyvalue.RedisConfiguration;

public class TestConcurrency {
    private static class Test1 implements Runnable {
        private BucketManagerFactory bmf;
        public Test1(BucketManagerFactory bmf) {
            this.bmf = bmf;
        }
        @Override
        public void run() {
            Map<String,Integer> cache = bmf.getMap("concurrency_cache", String.class, Integer.class);
            int hot = cache.get("hot");
            hot = hot + 1;
            cache.put("hot", hot);
        }
    }
    private static final int NTHREADS = 100;
    private static final int NOPS = 10000;
    private static void test(BucketManagerFactory bmf, String lbl, Runnable action) throws InterruptedException {
        long t1 = System.currentTimeMillis();
        Map<String,Integer> cache = bmf.getMap("concurrency_cache", String.class, Integer.class); // Redis Map (=Jedis object) is not thread safe but BucketManagerFactory (=JedisPool) is
        cache.put("hot", 0);
        ExecutorService es = Executors.newFixedThreadPool(NTHREADS);
        for(int i = 0; i < NOPS; i++) {
            es.submit(action);
        }
        es.shutdown();
        es.awaitTermination(300, TimeUnit.SECONDS);
        int hot = cache.get("hot");
        long t2 = System.currentTimeMillis();
        System.out.printf("%s : %d = %d (%d ms)\n", lbl, hot, NOPS, t2 - t1);
    }
    public static void main(String[] args) throws Exception {
        KeyValueConfiguration cfg = new RedisConfiguration();
        BucketManagerFactory bmf = cfg.get();
        test(bmf, "No handling", new Test1(bmf));
        bmf.close();
    }
}
No handling : 310 = 10000 (273353 ms)

(not sure why it is so slow - I suspect the Jedis pool is not working as it should, but I could not fix it)

Example will use StackExchange.Redis library available via NuGet.

Functionality:

using System;

using StackExchange.Redis;

namespace Cache.Redis.Functional
{
    public class Program
    {
        private static void Test(IDatabase cache, IServer srv)
        {
            // basic put, get and remove
            cache.StringSet("v", 123);
            Console.WriteLine(cache.StringGet("v"));
            cache.KeyDelete("v");
            Console.WriteLine(cache.StringGet("v"));
            cache.StringSet("v", 123);
            Console.WriteLine(cache.StringGet("v"));
            // persistence
            int? counter = (int?)cache.StringGet("counter");
            if (counter == null)
            {
                counter = 0;
            }
            counter++;
            Console.WriteLine(counter);
            cache.StringSet("counter", counter);
            // list all
            foreach(RedisKey key in srv.Keys(pattern: "*"))
            {
                Console.WriteLine("{0} = {1}", key, cache.StringGet(key));
            }
            // listener not supported
        }
        private const int FUNCTIONAL_CACHE = 0;
        public static void Main(string[] args)
        {
            ConnectionMultiplexer con = ConnectionMultiplexer.Connect("localhost:6379");
            IDatabase cache = con.GetDatabase(FUNCTIONAL_CACHE);
            IServer srv = con.GetServer("localhost", 6379, FUNCTIONAL_CACHE);
            Test(cache, srv);
         }
    }
}

Performance:

using System;

using StackExchange.Redis;

namespace Cache.Redis.Performance
{
    public class Program
    {
        private const int N = 10000;
        private static void Test(IDatabase cache)
        {
            // put speed
            DateTime dt1 = DateTime.Now;
            for (int i = 0; i < N; i++)
            {
                cache.StringSet("K#" + (i + 1), i + 1);
            }
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} puts per second", N * 1000 / (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int PERFORMANCE_CACHE = 1;
        private const int REP = 5;
        public static void Main(string[] args)
        {
            ConnectionMultiplexer con = ConnectionMultiplexer.Connect("localhost:6379");
            IDatabase cache = con.GetDatabase(PERFORMANCE_CACHE);
            for (int i = 0; i < REP; i++)
            {
                Test(cache);
            }
        }
    }
}
10183 puts per second
15060 puts per second
15174 puts per second
14903 puts per second
15243 puts per second

Concurrency:

using System;
using System.Threading;

using StackExchange.Redis;

namespace Cache.Redis.Concurrency
{
    public class Program
    {
        private delegate void F(IDatabase cache);
        private static void Test1(IDatabase cache)
        {
            int? hot = (int?)cache.StringGet("hot");
            hot = hot + 1;
            cache.StringSet("hot", hot);
        }
        private static void Test4(IDatabase cache)
        {
            cache.StringIncrement("hot", 1);
        }
        private static void Testn(IDatabase cache, int nrep, F action)
        {
            for (int i = 0; i < nrep; i++)
            {
                action(cache);
            }
        }
        private const int NTHREADS = 10;
        private const int NOPS = 10000;
        private static void Test(IDatabase cache, string lbl, F action)
        {
            DateTime dt1 = DateTime.Now;
            cache.StringSet("hot", 0);
            Thread[] t = new Thread[NTHREADS];
            for (int i = 0; i < NTHREADS; i++)
            {
                t[i] = new Thread(() => Testn(cache, NOPS / NTHREADS, action));
            }
            for (int i = 0; i < NTHREADS; i++)
            {
                t[i].Start();
            }
            for (int i = 0; i < NTHREADS; i++)
            {
                t[i].Join();
            }
            int? hot = (int?)cache.StringGet("hot");
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} : {1} = {2} ({3} ms)", lbl, hot, NOPS, (int)((dt2 - dt1).TotalMilliseconds));
        }
        private const int CONCURRENCY_CACHE = 2;
        public static void Main(string[] args)
        {
            ConnectionMultiplexer con = ConnectionMultiplexer.Connect("localhost:6379");
            IDatabase cache = con.GetDatabase(CONCURRENCY_CACHE);
            Test(cache, "No handling", Test1);
            // replace with check not supported by library
            Test(cache, "Atomic increment", Test4);
        }
    }
}
No handling : 1729 = 10000 (479 ms)
Atomic increment : 10000 = 10000 (231 ms)

Example will use hiredis library.

Functionality:

#include <stdio.h>

#include <hiredis/hiredis.h>

static void test(redisContext *cache)
{
    char *vkey = "v", *counterkey = "counter";
    int vval, vvallen, counterval, countervallen, i;
    redisReply *reply, *reply2;
    /* basic put, get and remove */
    vval = 123;
    reply = redisCommand(cache, "SET %s %b", vkey, &vval, sizeof(vval));
    if (reply == NULL)
    {
        printf("set failed: %s\n", cache->errstr);
        exit(1);
    }
    freeReplyObject(reply);
    reply = redisCommand(cache, "GET %s", vkey);
    if (reply == NULL)
    {
        printf("get failed: %s\n", cache->errstr);
        exit(1);
    }
    vval = *((int *)reply->str);
    printf("%d\n", vval);
    freeReplyObject(reply);
    reply = redisCommand(cache, "DEL %s", vkey);
    if (reply == NULL)
    {
        printf("delete failed: %s\n", cache->errstr);
        exit(1);
    }
    freeReplyObject(reply);
    reply = redisCommand(cache, "GET %s", vkey);
    if (reply == NULL)
    {
        printf("get failed: %s\n", cache->errstr);
        exit(1);
    }
    if(reply->type == REDIS_REPLY_STRING)
    {
        vval = *((int *)reply->str);
        printf("%d\n", vval);
    }
    else
    {
        printf("<not found>\n");
    }
    freeReplyObject(reply);
    vval = 123;
    reply = redisCommand(cache, "SET %s %b", vkey, &vval, sizeof(vval));
    if (reply == NULL)
    {
        printf("set failed: %s\n", cache->errstr);
        exit(1);
    }
    freeReplyObject(reply);
    reply = redisCommand(cache, "GET %s", vkey);
    if (reply == NULL)
    {
        printf("get failed: %s\n", cache->errstr);
        exit(1);
    }
    vval = *((int *)reply->str);
    printf("%d\n", vval);
    freeReplyObject(reply);
    /* persistence */
    reply = redisCommand(cache, "GET %s", counterkey);
    if (reply == NULL)
    {
        printf("get failed: %s\n", cache->errstr);
        exit(1);
    }
    if(reply->type == REDIS_REPLY_STRING)
    {
        counterval = *((int *)reply->str);
    }
    else
    {
        counterval = 0;
    }
    freeReplyObject(reply);
    counterval++;
    printf("%d\n", counterval);
    reply = redisCommand(cache, "SET %s %b", counterkey, &counterval, sizeof(counterval));
    if (reply == NULL)
    {
        printf("set failed: %s\n", cache->errstr);
        exit(1);
    }
    freeReplyObject(reply);
    /* list all */
    reply = redisCommand(cache, "KEYS *");
    if (reply == NULL)
    {
        printf("keys failed: %s\n", cache->errstr);
        exit(1);
    }
    for(i = 0; i < reply->elements; i++)
    {
        reply2 = redisCommand(cache, "GET %s", reply->element[i]->str);
        if (reply2 == NULL)
        {
            printf("get failed: %s\n", cache->errstr);
            exit(1);
        }
        printf("%s = %d\n", reply->element[i]->str, *((int *)reply2->str));
        freeReplyObject(reply2);
    }
    freeReplyObject(reply);
    /* listener not supported */
}

int main()
{
    redisContext *cache;
    cache = redisConnect("localhost", 6379);
    test(cache);
    redisFree(cache);
    return 0;
}

Performance:

#include <stdio.h>

/* high precision time */
#ifdef WIN32
#include <windows.h>
static inline double xtime()
{
    BOOL res;
    LARGE_INTEGER t, f;
    QueryPerformanceFrequency(&f);
    res = QueryPerformanceCounter(&t);
    return res ? (t.QuadPart * 1.0 / f.QuadPart) : 0;
}
#else
#include <time.h>
static inline double xtime()
{
    int res;
    struct timespec t;
    res = clock_gettime(CLOCK_MONOTONIC, &t);
    return (res == 0) ? (t.tv_sec + t.tv_nsec / 1000000000.0) : 0;
}
#endif

#include <hiredis/hiredis.h>

#define N 10000

static void test(redisContext *cache)
{
    char key[10];
    int val;
    redisReply *reply;
    double t1, t2;
    int i;
    /* put speed */
    t1 = xtime();
    for(i = 0; i < N; i++)
    {
        sprintf(key, "K#%d", i + 1);
        val = i + 1;
        reply = redisCommand(cache, "SET %s %b", key, &val, sizeof(val));
        if (reply == NULL)
        {
            printf("set failed: %s\n", cache->errstr);
            exit(1);
        }
        freeReplyObject(reply);
    }
    t2 = xtime();
    printf("%d puts per second\n", (int)(N / (t2 - t1))); 
}

#define REP 5

int main()
{
    redisContext *cache;
    int i;
    cache = redisConnect("localhost", 6379);
    for(i = 0; i < REP; i++)
    {
        test(cache);
    }
    redisFree(cache);
    return 0;
}
34748 puts per second
34953 puts per second
34134 puts per second
34643 puts per second
35003 puts per second

Concurrency:

#include <stdio.h>

/* high precision time */
#ifdef WIN32
#include <windows.h>
static inline double xtime()
{
    BOOL res;
    LARGE_INTEGER t, f;
    QueryPerformanceFrequency(&f);
    res = QueryPerformanceCounter(&t);
    return res ? (t.QuadPart * 1.0 / f.QuadPart) : 0;
}
#else
#include <time.h>
static inline double xtime()
{
    int res;
    struct timespec t;
    res = clock_gettime(CLOCK_MONOTONIC, &t);
    return (res == 0) ? (t.tv_sec + t.tv_nsec / 1000000000.0) : 0;
}
#endif

/* simple thread API */
#ifdef WIN32
#define T HANDLE
#define TSTART(t, f) t = CreateThread(NULL, 0, f, NULL, 0, NULL)
#define TWAIT(t) WaitForSingleObject(t, INFINITE); CloseHandle(t)
#define TFRETURN DWORD __stdcall
typedef DWORD (__stdcall *fptr)(void *data);
#else
#include <pthread.h>
#define T pthread_t
#define TSTART(t, f) pthread_create(&t, NULL, f, NULL)
#define TWAIT(t) pthread_join(t, NULL);
#define TFRETURN void *
typedef void *(*fptr)(void *data);
#endif

#include <hiredis/hiredis.h>

#define NOPS 10000
#define NTHREADS 100

static TFRETURN test1(void *data)
{
    redisContext *cache;
    redisReply *reply;
    char *key = "hot";
    char val[21];
    int i, ival;
    cache = redisConnect("localhost", 6379);
    for(i = 0; i < NOPS / NTHREADS; i++)
    {
        reply = redisCommand(cache, "GET %s", key);
        if (reply == NULL)
        {
            printf("get failed: %s\n", cache->errstr);
            exit(1);
        }
        strcpy(val, reply->str);
        freeReplyObject(reply);
        ival = atoi(val);
        ival = ival + 1;
        sprintf(val, "%d", ival);
        reply = redisCommand(cache, "SET %s %s", key, val);
        if (reply == NULL)
        {
            printf("set failed: %s\n", cache->errstr);
            exit(1);
        }
        freeReplyObject(reply);
    }
    redisFree(cache);
    return 0;
}

static TFRETURN test4(void *data)
{
    redisContext *cache;
    redisReply *reply;
    char *key = "hot";
    long long int val;
    int i;
    cache = redisConnect("localhost", 6379);
    for(i = 0; i < NOPS / NTHREADS; i++)
    {
        reply = redisCommand(cache, "INCRBY %s %s", key, "1");
        if (reply == NULL)
        {
            printf("incrby failed: %s\n", cache->errstr);
            exit(1);
        }
        freeReplyObject(reply);
    }
    redisFree(cache);
    return 0;
}

static void test(redisContext *cache, char *lbl, fptr f)
{
    redisReply *reply;
    char *key = "hot";
    char val[21];
    int i;
    double t1, t2;
    T t[NTHREADS];
    strcpy(val, "0");
    reply = redisCommand(cache, "SET %s %s", key, val);
    if (reply == NULL)
    {
        printf("set failed: %s\n", cache->errstr);
        exit(1);
    }
    freeReplyObject(reply);
    t1 = xtime();
    for(i = 0; i < NTHREADS; i++)
    {
        TSTART(t[i], f);
    }
    for(i = 0; i < NTHREADS; i++)
    {
        TWAIT(t[i]);
    }
    t2 = xtime();
    reply = redisCommand(cache, "GET %s", key);
    if (reply == NULL)
    {
        printf("get failed: %s\n", cache->errstr);
        exit(1);
    }
    strcpy(val, reply->str);
    freeReplyObject(reply);
    printf("%s : %s = %d (%.3f ms)\n", lbl, val, NOPS, t2 - t1);
}

int main()
{
    redisContext *cache;
    cache = redisConnect("localhost", 6379);
    test(cache, "No handling", test1);
    test(cache, "Atomic increment", test4);
    redisFree(cache);
    return 0;
}
No handling : 281 = 10000 (0.278 ms)
Atomic increment : 10000 = 10000 (0.156 ms)

Build Windows:

cl /MD /D WIN32 /I %HIREDIS%\include %1.c %HIREDIS%\lib\hiredis.lib ws2_32.lib

Example wil use redis extension for PHP.

Functionality:

<?php
function test($cache) {
    // basic put, get and remove
    $cache->set('v', 123);
    echo $cache->get('v') . "\r\n";
    $cache->del('v');
    echo $cache->get('v') . "\r\n";
    $cache->set('v', 123);
    echo $cache->get('v') . "\r\n";
    // persistence
    $counter = $cache->get('counter');
    if(!$counter) {
        $counter = 0;
    }
    $counter++;
    echo $counter . "\r\n";
    $cache->set('counter', $counter);
    // list all (simple)
    foreach($cache->keys('*') as $key) {
        echo sprintf("%s = %d\r\n", $key, $cache->get($key));
    }
    // list all (batched)
    $it = NULL;
    while($keys = $cache->scan($it)) {
        foreach($keys as $key) {
            echo sprintf("%s = %d\r\n", $key, $cache->get($key));
        }
    }
    // listener not supported
}
    
define('FUNCTIONAL_CACHE', 0);
$cache = new Redis();
$cache->connect('localhost', 6379);
$cache->select(FUNCTIONAL_CACHE);
test($cache);
$cache->close();
?>

Performance:

<?php
define('N', 10000);
function test($cache) {
    // put speed
    $t1 = gettimeofday(TRUE);
    for($i = 0; $i < N; $i++) {
        $cache->set('K#' . ($i + 1), $i + 1);
    }
    $t2 = gettimeofday(TRUE);
    echo sprintf("%d puts per second\r\n", (int)(N / ($t2 - $t1)));
}

define('REP', 5);
define('PERFORMANCE_CACHE', 1);
$cache = new Redis();
$cache->connect('localhost', 6379);
$cache->select(PERFORMANCE_CACHE);
for($i = 0; $i < REP; $i++) {
    test($cache);
}
$cache->close();
?>
29235 puts per second
29364 puts per second
29278 puts per second
29364 puts per second
29408 puts per second

Concurrency:

<?php
define('CONCURRENCY_CACHE', 2);
define('NTHREADS', 10); // 100 does not work
define('NOPS', 10000);

use pht\Thread;

function test1() {
    $cache = new Redis();
    $cache->connect('localhost', 6379);
    $cache->select(CONCURRENCY_CACHE);
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $hot = $cache->get('hot');
        $hot = $hot + 1;
        $cache->set('hot', $hot);
    }
    $cache->close();
}

function test4() {
    $cache = new Redis();
    $cache->connect('localhost', 6379);
    $cache->select(CONCURRENCY_CACHE);
    for($i = 0; $i < NOPS/NTHREADS; $i++) {
        $cache->incrBy('hot', 1);
    }
    $cache->close();
}

function dispatch($me, $f) {
    $cmd = sprintf('%s -d include_path=%s %s %s', PHP_BINARY, get_include_path(), $me, $f);
    shell_exec($cmd);
}

function test($lbl, $me, $f) {
    $cache = new Redis();
    $cache->connect('localhost', 6379);
    $cache->select(CONCURRENCY_CACHE);
    $t1 = gettimeofday(TRUE);
    $cache->set('hot', 0);
    $t = array();
    for($i = 0; $i < NTHREADS; $i++) {
        $t[] = new Thread();
        $t[$i]->addFunctionTask('dispatch', $me, $f);
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->start();        
    }
    for($i = 0; $i < NTHREADS; $i++) {
        $t[$i]->join();
    }
    $hot = $cache->get('hot');
    $t2 = gettimeofday(TRUE);
    echo sprintf("%s : %d = %d (%d ms)\r\n", $lbl, $hot, NOPS, (int)(($t2 - $t1) * 1000));
    $cache->close();
}

if(count($argv) < 2) {
    test('No handling', $argv[0], 'test1');
    // replace with check not supported by memcache extension
    test('Atomic increment', $argv[0], 'test4');
} else {
    $argv[1]();
}
?>

The test code may look a little weird as it testing concurrency in a CLI app. Real usage would of course be in PHP pages served by web server.

No handling : 2440 = 10000 (7519 ms)
Atomic increment : 10000 = 10000 (758 ms)

Module redis can be installed via pip.

Functionality:

import redis

FUNCTIONAL_CACHE = 0
cache = redis.Redis(host='localhost',port=6379,db=FUNCTIONAL_CACHE)

# basic put, get and remove
cache.set("v", 123)
print(int(cache.get("v")))
cache.delete("v")
print(cache.get("v"))
cache.set("v", 123)
print(int(cache.get("v")))

# persistence
counter = cache.get("counter")
if(counter == None):
    counter = 0;
else:
    counter = int(counter)
counter = counter + 1
print(counter)
cache.set("counter", counter)

# list all (simple)
for key in cache.keys("*"):
    print("%s = %d" % (key.decode('utf-8'), int(cache.get(key))))

#list all (batched)
for key in cache.scan_iter("*"):
    print("%s = %d" % (key.decode('utf-8'), int(cache.get(key))))

# listener not supported

cache.close()

Performance:

import redis
import time

PERFORMANCE_CACHE = 1
cache = redis.Redis(host='localhost',port=6379,db=PERFORMANCE_CACHE)

# put speed
N = 10000
for j in range(5):
    t1 = time.time()
    for i in range(N):
        cache.set("K#" + str(i + 1), i + 1);
    t2 = time.time()
    print("%d puts per second" % (N / (t2 - t1)))

cache.close()
9310 puts per second
9424 puts per second
9433 puts per second
9397 puts per second
9415 puts per second

Concurrency:

import redis
import multiprocessing
import time

CONCURRENCY_CACHE = 2
NTHREADS = 10
NOPS = 10000

def test1():
    cache = redis.Redis(host='localhost',port=6379,db=CONCURRENCY_CACHE)
    for i in range(NOPS//NTHREADS):
        hot = int(cache.get("hot"))
        hot = hot + 1
        cache.set("hot", hot)
    cache.close()

def test2():
    cache = redis.Redis(host='localhost',port=6379,db=CONCURRENCY_CACHE)
    for i in range(NOPS//NTHREADS):
        with cache.pipeline() as pipe:
            while True:
                try:
                    pipe.watch("hot")
                    pipe.multi()
                    hot = int(cache.get("hot"))
                    hot = hot + 1
                    pipe.set("hot", hot)
                    pipe.execute()
                    pipe.unwatch()
                    break
                except redis.WatchError:
                    pass
    cache.close()

def test3():
    cache = redis.Redis(host='localhost',port=6379,db=CONCURRENCY_CACHE)
    for i in range(NOPS//NTHREADS):
        cache.incrby("hot", 1)
    cache.close()

def test(lbl, f):
    cache = redis.Redis(host='localhost',port=6379,db=CONCURRENCY_CACHE)
    t1 = time.time()
    cache.set("hot", 0)
    t = []
    for i in range(NTHREADS):
        t.append(multiprocessing.Process(target=f, args=()))
    for i in range(NTHREADS):
        t[i].start()
    for i in range(NTHREADS):
        t[i].join()
    hot = int(cache.get("hot"))
    t2 = time.time()
    print("%s : %d = %d (%d ms)" % (lbl, hot, NOPS, (t2 - t1) * 1000))
    cache.close()

if __name__ == '__main__':
    test("No handling", test1)
    test("Replace with check", test2)
    test("Atomic increment", test3)
No handling : 2216 = 10000 (939 ms)
Replace with check : 10000 = 10000 (6149 ms)
Atomic increment : 10000 = 10000 (616 ms)

Note that comparing speed of Python with other languages for this test is misleading as Python is implemented with multiple processes while the other languages are implemented with threads.

Next:

See Database and cache.

Article history:

Version Date Description
1.0 July 1st 2020 Initial version
1.1 October 4th 2020 Add memcached and redis C examples
1.2 October 24th 2020 Add Ignite SQL section
1.3 November 27th 2020 Add Redisson
1.4 January 6th 2021 Add JNoSQL API

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj