Showing posts with label multithreading. Show all posts
Showing posts with label multithreading. Show all posts

Saturday, 17 June 2017

Counting Semaphore, CountDownLatch, CyclicBarrier - synchronization methods for concurrency

Background

With Java 5 a lot of concurrency mechanisms were introduced for synchronization.



In one of the previous posts, we saw what Reentrant locks are and how they help us achieve concurrency. -
We also saw
Along with ReentrantLock and ExecutorService there were other concurrency elements that were introduced in Java 5 like -
  • Counting Semaphore
  • CountDownLatch
  • CyclicBarrier
Today we will try to understand these. Not only their understanding helps us with multithreading they are also a popular topic of Java interview question. Let's see them one by one.

Counting Semaphore

Semaphore maintains a number of permits for a resource and only that many number of threads can access the resource. If the maximum permits allowed is reached then threads will have to wait till some other thread owing a permit releases it. 

As an example lets consider a simple semaphore with 1 permit. It's called binary semaphore. It's similar to wait and notify on same object. 

    public static void main(String args[])
    {
        Semaphore binarySemaphore = new Semaphore(1);
        new Thread(() -> {
            try {
                binarySemaphore.acquire();
                System.out.println("Semaphore permit acquired by : " + Thread.currentThread().getName());
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                System.out.println("Semaphore permit getting released by : " + Thread.currentThread().getName());
                binarySemaphore.release();            }
            
        }).start();
        new Thread(() -> {
            try {
                binarySemaphore.acquire();
                System.out.println("Semaphore permit acquired by : " + Thread.currentThread().getName());
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            finally {
                System.out.println("Semaphore permit getting released by : " + Thread.currentThread().getName());
                binarySemaphore.release();
            }
            
        }).start();
    }
and the output would be -
Semaphore permit acquired by : Thread-0
Semaphore permit getting released by : Thread-0
Semaphore permit acquired by : Thread-1
Semaphore permit getting released by : Thread-1


 As you can see from code above you acquire a permit using acquire() method and release a permit using release() method.


NOTES :
  1. You can also acquire permit using acquireUninterruptibly(). This is a blocking call and the thread cannot be interrupted.
  2. Now acquire() is also a blocking call however it can be interrupted unlike acquireUninterruptibly() call
  3. You can also use tryAcquire() call which will try to acquire the permit and if available will return immediately with true. If it is not available it will also return immediately with false. So this is a non blocking call.

CountDownLatch

This is another synchronization mechanism in which a resource is not allowed access till predefined number of threads don't complete their operations. So lets say there are 10 threads making a bread slice. As soon as we are ready with 5 slices we can lets say pack it together for selling. In this case we can use a CountDownLatch. Initialize one with 5 and as soon as 5 threads acknowledge they have finished making slices we can start packing (probably a new thread).

So a thread will wait for n other threads. Let's see an example -

    public static void main(String args[])
    {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Calling countdown by : " + Thread.currentThread().getName());
                countDownLatch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
           
        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Calling countdown by : " + Thread.currentThread().getName());
                countDownLatch.countDown();
            } catch (Exception e) {
                e.printStackTrace();
            }
           
        }).start();
       
        try {
            System.out.println("Waiting for all other threads finish operation");
            countDownLatch.await();
            System.out.println("All other threads finish operation!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


Output :
Waiting for all other threads finish operation
Calling countdown by : Thread-1
Calling countdown by : Thread-0
All other threads finish operation!
As you can see main thread calls await() on the countdownlatch and wait for 2 threads to call countDown() on it. Here n is 2 but you can configure it in the constructor.

You need to use this when your use case is to wait for some other initial operations to finish before starting some other operation.

NOTE :
  1.  CountDownLatch is not reusable. So once the count reaches 0 i.e n threads have called countdown() the latch is unusable.

CyclicBarrier

CyclicBarrier is yet another synchronization mechanism. In this, all n threads will wait for each other to reach the barrier. Such waiting threads are called parties. The number of parties are set in the CyclicBarrier during its creation. All parties reach the barrier and call await() which is a blocking call. Once all parties reach the barrier i.e all call await() then all threads get unblocked and proceed for next execution.

Simple use case that you can think of is a multiple game scenario in which a game would not start until all the players have joined. Here all the players are parties whereas game start is a barrier.

Eg -

    public static void main(String args[])
    {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Player joining : " + Thread.currentThread().getName());
                cyclicBarrier.await();
                System.out.println("Game starting from : " + Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            }
           
        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Player joining : " + Thread.currentThread().getName());
                cyclicBarrier.await();
                System.out.println("Game starting from : " + Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            }
           
        }).start();
        new Thread(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("Player joining : " + Thread.currentThread().getName());
                cyclicBarrier.await();
                System.out.println("Game starting from : " + Thread.currentThread().getName());
            } catch (Exception e) {
                e.printStackTrace();
            }
           
        }).start();
       
    }

and the output is -
Player joining : Thread-0
Player joining : Thread-1
Player joining : Thread-2
Game starting from : Thread-2
Game starting from : Thread-0
Game starting from : Thread-1
As you can see all threads (3 in above case) will wait for each other to reach the barrier. Once they all reach and call await() they can all proceed to their further tasks.
NOTE :
  1. cyclicBarrier.reset() will put barrier on its initial state, other thread which is waiting or not yet reached barrier will terminate with java.util.concurrent.BrokenBarrierException. So CyclicBarrier can be reused unlike CountDownLatch.

  • Both CyclicBarrier and CountDownLatch are used to implement a scenario where one Thread waits for one or more Thread to complete there job before starts processing but there is one Difference between CountDownLatch and CyclicBarrier in Java which separates them apart and that is, you can not reuse same CountDownLatch instance once count reaches to zero and latch is open, on the other hand CyclicBarrier can be reused by resetting Barrier, Once barrier is broken.
  • One major difference is that CyclicBarrier takes an (optional) Runnable task which is run once the common barrier condition is met.
  • It also allows you to get the number of clients waiting at the barrier and the number required to trigger the barrier. Once triggered the barrier is reset and can be used again.

Summary

Semaphore : Manages a fixed sized pool of resources.
CountDownLatch : One or more threads wait for a set of threads to finish operations.
CyclicBarrier : Set of threads wait for each other until they reach a specific point.

Related Links

Saturday, 20 May 2017

How ConcurrentHashMap Works Internally in Java

Background

In one of the previous posts we saw how HashMap works -
and how it's time complexity of insertion and deletion is O(1) is normal case. Though this is a great data structure to work with in terms of time complexity it is not thread safe which means you cannot use it directly in multi threaded environments without taking additional precautions like synchronizing put/get on your own. Instead Java has provided a thread safe implementation of concurrent hashmap. We can directly use it in case of multi threaded environments for thread safety. Eg. in case of parallel stream introduced in java 8.

How ConcurrentHashMap Works Internally in Java

Before we see how it is implemented in Java lets give it some though. What are possible problems with a HashMap. Race condition, invalid state. Lets say two writes happen at the same time. Since write is not an atomic operation one value may overwrite other and Map may go in inconsistent state. We can obviously add synchronization over read/writes of a HashMap but it would be very inefficient and have performance impact. I would be like single threaded application certainly the behavior we don't expect. To solve this issue Java provides ConcurrentHashMap that has built in thread safety. Let see how -

We know how HashMap works. Internally it stores an array of Entry object which essentially has key, value and pointer to next Entry object (linked list used in case of collision). You can think of each array element as bucket and each Entry object as a data point containing key (in case 2 keys have same hash - collision), value  and pointer to next data element. 

Working :
ConcurrentHashMap as the name suggests allows concurrent read/writes to the Map. But there are limitations. ConcurrentHashMap maintains another data structure internally called segments. Each bucket of HashMap is part of one of the segments. Number of segments is called Concurrency-Level which determines number of thread that can write simultaneous. This Segments gets locked when writing/updating/removing data. Think of Segments as locks used to prevent concurrent write to same bucket of hashmap leading to inconsistency. So as long as write to concurrent hashmap is on different segments it can happen in parallel. Reads are completely lock free i.e No need to acquire lock for reading. Last updated value is returned.


 Now lets go step by step -

 Concurrency-Level , Segment array and initialization :
  • First when you create a ConcurrentHashMap you can provide concurrency level. This determines size of Segment array. Size of segment array will always be equal or more than the concurrency level. If this is not provided default is used - 
    • static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
  • Note that the size of segment table will always be power of 2. So if you give  concurrency level as 10 then next best power of 2 match will be picked up i.e 16 and Segment array of size 16 will be created which implies 16 threads can simultaneously operate on the map.
static final class Segment<K,V> extends ReentrantLock implements Serializable {

    //The number of elements in this segment's region.
    transient volatile int count;
    //The per-segment table. 
    transient volatile HashEntry<K,V>[] table;
}

Putting element in ConcurrentHashMap :

  • For putting element in Map we first need to determine which segment the element should be processed for. For this we first get hascode of the key. Next we do a rehash of the existing hash to ensure
     /**
     * Applies a supplemental hash function to a given hashCode, which
     * defends against poor quality hash functions.  This is critical
     * because ConcurrentHashMap uses power-of-two length hash tables,
     * that otherwise encounter collisions for hashCodes that do not
     * differ in lower or upper bits.
     */
    private static int hash(int h) {
        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }
  •  Once hash is calculated you can get the segment which it belongs to and delegate put method to segments put method as follows -
    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }

    final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }


We will see how segment is computed in some time with a proper example. Once put is delegated to segment , segment will add it to the appropriate bucket in the segment.

        V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }


Now this is very interesting method. Lets understand whats happening here.

  • First call is to lock(). Since it is a write/update operation on a bucket of same segment we need a lock. If you recollect Segment class it extends ReentrantLock so each segment is a lock. So you can call lock() and unlock() directly in Segment class.
  • Next it's like a normal HashMap. You find the index of the Entry table where your elements hash falls and add it there as linked list.
  • You can see similar code as HashMap that updates value if key is same, inserts in array if there is no element in the table and adds it in the linked list of the table if element already exists.
  • Finally once operation is complete it calls unlock() so that other threads can continue update.
  • Note the lock is a blocking call. 
  • You can also see call for rehash if threshold is reached. Like Entry array Segment also has a threshold and when it is reached Segment array is resized for performance. That's what rehash. 
NOTE : For getting index of Segment table first n bits are used where as for getting index of Entry table last N bits are used from enhanced hash integer (See details in example below).

Getting element from  ConcurrentHashMap : 

Get on ConcurrentHashMap is very simple no locks involved. You simply read the data and return -

        public V get(Object key) {
                int hash = hash(key.hashCode());
                return segmentFor(hash).get(key, hash);
        }

        V get(Object key, int hash) {
            if (count != 0) { // read-volatile
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }


NOTE  : readValueUnderLock method is used as a backup in case a null (pre-initialized) value is ever seen in an unsynchronized access method.

Example

Above was just all code and some understanding. Now lets take an actual example.

Let's say we have created a ConcurrentHashMap with concurrency level lets say 10. Based on this Segment array will be created based on following code -

    private static void printSegmentDetails(int concurrencyLevel) {
        int sshift = 0;
        int segmentMask = 0;
        int segmentShift = 0;

        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        System.out.println("Segment array size :" + ssize);
        System.out.println("segmentShift : " + segmentShift);
        System.out.println("segmentMask : " + segmentMask);
    }


Output for 10 concurrency level:
Segment array size : 16
segmentShift : 28
segmentMask : 15

NOTE  :As mentioned before segment array is of size 2^n such that 2^n >= concurrency level. In this case 2^4

Now that we have segment table in place lets simulate put. We need to put a String called "Aniket" as key. We don't care about value. Just make sure it's not null.

  1. First we will calculate hascode of the key.
  2. Then hash it so for better hash (as mentioned above)
  3. Then based on the result hash we will find which segment will it belong
Remember of Segment table was >= 2^N we now want first N bits to determine which segment this hash falls into. Since N bits will vary from 1 - 2^N which is our segment array size. Also remember code to get this index from above? -
  • int segmentIndex = (hash >>> segmentShift) & segmentMask
This essentially means logically right shift hash with segmentShift bits. Since int is 32 bit and segmentShift = 32 - sshift, hash >>> segmentShift will essentially give you first sshift bits (sshift is nothing but N in 2^N we saw above). segmentMask is to get the N bits post shift.

So in this case,
N  = sshift =  4
2^N = 16 -> Size of segment array
segmentShift = 32 - 4 = 28 (as we saw in output above)
segmentMask = 16 -1 - 15

    public static void main(String args[]) {    
        String key = "Aniket";
        //hascode of key
        System.out.println(key.hashCode());
        //better hash
        System.out.println(hash(key.hashCode()));
        //better hash in binary
        System.out.println(Integer.toBinaryString(hash(key.hashCode())));
        //logical right shift by segmentShift
        System.out.println("Right shifter hash : " + Integer.toBinaryString(hash(key.hashCode()) >>> 28));
        // segment index as binary and of right shift and segmentMask
        System.out.println("Segment Index : " + Integer.toBinaryString((hash(key.hashCode()) >>> 28 ) & 15));
        // segment index as decimal
        System.out.println("Segment Index : " + ((hash(key.hashCode()) >>> 28 ) & 15));
    }


Output :
1965716254
1839402854
1101101101000110000111101100110
Right shifter hash : 110
Segment Index : 110
Segment Index : 6


NOTE : 1101101101000110000111101100110 is 31 bits as rightmost bit is 0 and ignored.  Same goes for all subsequent binmary bit formats.

So your element with key "Aniket" will go in Segment array of index 6. Inside segments it's pretty simple to calculate index of Entry array.

  •  int entryArrayindex = hash & (tab.length - 1);
         int entryArrayindex = (hash(key.hashCode()) & (16 - 1));
         System.out.println("Entry array index : " + entryArrayindex);
         System.out.println("Entry array index in binary : " + Integer.toBinaryString(entryArrayindex));


Output :
Entry array index : 6
Entry array index in binary : 110


So finally Entry is inserted at index 6 of Entry table.

So to summarize for getting index of Segment table first n bits are used where as for getting index of Entry table last N bits are used from enhanced hash integer.


Related Links 

Friday, 17 June 2016

Iterator Design Pattern

Background

In one of the previous posts we saw Introduction to Design Patterns. In that we learned how different design patterns solve some pre identified design concerns and provide a good solution. Above post also states types of design patterns and links to design patterns we have already covered. In this post we will see another design patter - Iterator pattern. Is it same as the iterator I use to iterate over collections? Yes it is! And that a design pattern? What problem does that solve? Hold on to that we will come there in some time :)



Problem

Lets say there are two companies - Company A and company B. They both maintain employee records for their respective employees and their implementation is something like below - 

Company A Employee records - 

/**
 * 
 * @author athakur
 *
 */
public class CompanyAEmpRecords {

    private Employee[] companyEmployees = new Employee[10];
    private int index = -1;

    public void addEmployee(Employee newEmployee) {
        if (index == 10) {
            throw new RuntimeException("Maximum employee limit reached");
        }
        companyEmployees[index++] = newEmployee;
    }

    public void removeEmployee(String name) {
        // implementation to remove employee
    }

    public int getNoOfEmployees() {
        return index + 1;
    }
    
    public Employee[] getEmployees() {
        return companyEmployees;
    }

}

Company B Employee records - 

/**
 * 
 * @author athakur
 *
 */
public class CompanyBEmpRecords {
    private List<Employee> companyEmployees = new ArrayList<>();

    public void addEmployee(Employee newEmployee) {
        companyEmployees.add(newEmployee);
    }

    public void removeEmployee(String name) {
        // implementation to remove an employee based on name
    }

    public int getNoOfEmployees() {
        return companyEmployees.size();
    }
    
    public List<Employee> getEmployees() {
        return companyEmployees;
    }

}

Life was all good when they were working independently. Company A was small and sufficient with less than 10 employees where as Company B did not really care how many employees joined. But then one day they decided to merge and expand their business. Employees of both the companies will now be under one entity and the task to create code that lists down both company's employees now rests on you. You know both Employee record implementation of companies. So you start writing code using it.


/**
 * 
 * @author athakur
 *
 */
public class CompanyRecordsPrinter {
    
    public void pringCompanyEMployeeRecords(CompanyAEmpRecords companyAEmpRecords, CompanyBEmpRecords companyBEmpRecords) {
        
        Employee[] companyAEmployees = companyAEmpRecords.getEmployees();
        for(int i=0; i< companyAEmpRecords.getNoOfEmployees();i++) {
            System.out.println(companyAEmployees[i]);
        }
        
        List<Employee> companyBEmployees= companyBEmpRecords.getEmployees();
        for(Employee emp : companyBEmployees) {
            System.out.println(emp);
        }
        
    }

}

Well that serves the purpose. We are printing employees in both companies using their records. But is it a good design. Two loops for two different types of data structures. What if Company C is merged with this later. Add a new loop to handle it? Naah. Something does not feel right. This is where iterator pattern comes into picture.

Iterator Pattern defined

The Iterator pattern provides a way to access the elements of an aggregate object sequentially without exposing it's underlying representation.

Solution

You will basically have a common interface called Iterator which will have methods like -
  • boolean hasNext()
  • Object next()
Each Employee Record class will have a method called getIterator() that will basically return corresponding new instance of Iterator. Lets call it -
  • CompanyAEmpRecordsIterator
  • CompanyBEmpRecordsIterator
Then you can have a common method that take an Object of type Iterator and iterate over it using hasNext() method and get the actual data using next() method.

Sample code -

public class CompanyAEmpRecords implements CompanyEmpRecords {

    private Employee[] companyEmployees = new Employee[10];
    private int index = -1;

    @Override
    public void addEmployee(Employee newEmployee) {
        if (index == 9) {
            throw new RuntimeException("Employees limit reached");
        }
        companyEmployees[++index] = newEmployee;
    }

    @Override
    public void removeEmployee(Employee oldEmployee) {
        int i = 0;
        for (; i <= index; i++) {
            if (companyEmployees[i].equals(oldEmployee)) {
                break;
            }
        }
        for (int j = i; j <= index - 1; j++) {
            companyEmployees[j] = companyEmployees[j + 1];
        }
        companyEmployees[index] = null;
        index--;

    }

    @Override
    public int getNoOfEmployees() {
        return index + 1;
    }

    @Override
    public Iterator getIterator() {
        return new CompanyAEmpRecordsIterator();
    }

    private class CompanyAEmpRecordsIterator implements Iterator {

        int currIndex = -1;

        @Override
        public boolean hasNext() {
            if (currIndex + 1 <= index)
                return true;
            else
                return false;
        }

        @Override
        public Object next() {
            if (currIndex + 1 <= index)
                return companyEmployees[++currIndex];
            else
                return null;
        }

    }

}


You get the point.  And your printing logic will be as simple as  -

    public void pringCompanyEMployeeRecords(CompanyAEmpRecords companyAEmpRecords, CompanyBEmpRecords companyBEmpRecords) {
        
        printRecord(companyAEmpRecords.getIterator());
        printRecord(companyBEmpRecords.getIterator());
    }
    
    private void printRecord(Iterator recordIterator) {
        while(recordIterator.hasNext()) {
            System.out.println(recordIterator.next());
        }
    }


This is just the snippet I have provided. You can download the complete code from my git repository -
NOTE : Instead of defining your own iterator interface you can use java.util.Iterator interface instead.

NOTE :  we have not added remove() method in the Iterator interface. Neither are we handling multi threading scenarios like what happens when the collection gets modified when you are iterating using that iterator. The way this is handled is that when iterator is created we copy the modcount (modification count) and at any point during the iteration this is different than the original count we throw java.util.ConcurrentModificationException.

For sample code you can refer to Iterator provided by ArrayList class in Java -

    private class Itr implements Iterator<E> {
        int cursor;       // index of next element to return
        int lastRet = -1; // index of last element returned; -1 if no such
        int expectedModCount = modCount;

        public boolean hasNext() {
            return cursor != size;
        }

        @SuppressWarnings("unchecked")
        public E next() {
            checkForComodification();
            int i = cursor;
            if (i >= size)
                throw new NoSuchElementException();
            Object[] elementData = ArrayList.this.elementData;
            if (i >= elementData.length)
                throw new ConcurrentModificationException();
            cursor = i + 1;
            return (E) elementData[lastRet = i];
        }

        public void remove() {
            if (lastRet < 0)
                throw new IllegalStateException();
            checkForComodification();

            try {
                ArrayList.this.remove(lastRet);
                cursor = lastRet;
                lastRet = -1;
                expectedModCount = modCount;
            } catch (IndexOutOfBoundsException ex) {
                throw new ConcurrentModificationException();
            }
        }

        final void checkForComodification() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
        }
    }


Modification count and ConcurrentModificationException


Lets try to understand modcount we just discussed above in a better way. Now a collection when initialized has a variable called

protected transient int modCount = 0;

This keeps track of modifications made to the collection. Now when you create an iterator for your collection this modcount gets copied over to your iterator as expectedModCount -

int expectedModCount = modCount;

Now during each iterator of iterator checkForComodification() method is called which does following -

        final void checkForComodification() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
        }

So if modCount of the collection is not same as what was copied over to it's iterator during it's creation (stored as expectedModCount) then it throws ConcurrentModificationException.

Exception for this is when you use iterators remove method in which case iterator updates it's expectedModCount accordingly.

Class Diagram



Related Links

Friday, 10 June 2016

Locking and Visibility in Java Multithreaded programs

Background

In one of the previous post on Race Condition, Synchronization, atomic operations and Volatile keyword we saw what is race condition, how can we use synchronization to avoid it. We also saw what is volatile variables and what are they used for. Though that post speaks a lot about multi threading issues and it's solution I am writing this post to get even a clearer perspective.

This post is more about memory visibility meaning it is about reading stale values rather than worrying about race conditions.

Issue : Each thread has it's own stack and own cache where values are cached for faster access. Though this is a feature used for performance it can led to undesirable results. Lets say there is a mutable value that is shared among two threads. If one thread modifies it's value and thread two tries to do a subsequent read, it is not guaranteed that thread 2 will read the modified value. This may be because of caching data in threads.



JVM may reorder read/writes for optimizations. Understand we are not talking about race condition here at all. Even if the operations were atomic this issue would happen. So the issue is about memory visibility of a mutable object across threads and the challenge is to get the latest/correct value in read which followed a write.

Issue 2 (Non atomic 64bit operations) : Java memory model required fetch and store operations to be atomic . Exception is for non volatile long and double data types where JVM is permitted to treat a 64 bit read/write operations as two separate 32 bit operations. So reads and writes for these happen in different threads read can give high 32 bits of one value and lower 32 bits of another.[Solution : declare non atomic double and long data types as volatile or guard them with a lock]


Solution to Memory visibility issue

We saw the issue with memory visibility. Now lets see how we can resolve this.

Intrinsic locks guarantee that one thread will see the changes made by another thread in a predictable manner. So lets say in a synchronized region thread T1 makes some changes and then thread T2 enters the same critical region (after T2 releases the lock of course) then T2 is guaranteed to see changes made by T1. So the issue we discussed above will not occur i.e no stale values.

Summing it up : Locking is not just about atomicity i.e making compound operations atomic but it is also about memory visibility. To ensure all threads see latest updated value of shared mutable variable, reading and writing threads must be synchronized on a common lock.


Another solution ofcourse is to make shared mutable variables volatile. I am not going to discuss volatile here. You can refer the previous post for details -  Race Condition, Synchronization, atomic operations and Volatile keyword.[Volatile keyword guarantees that all reads of a volatile variable are read directly from main memory, and all writes to a volatile variable are written directly to main memory]



Related Links

Tuesday, 7 June 2016

ReentrantLock in Java

ReentrantLocks

Lets me first try to explain reentrancy concept in a simplistic and generic way. We will come to Java specific details a bit later. Reentrancy is lay man terms means ability to enter again. In terms of thread it mean thread can acquired same lock again without blocking itself. Refreshing our multi threading concepts here. When you synchronize over an object the thread obtains a lock on it before entering the critical region (inside synchronized block) and till this thread releases this lock no other thread can acquire it and enter the critical region. 

NOTE : We do this to make compound operations atomic so that there is no race condition or invalid state.

But what happens when we call an synchronized instance method from inside another synchronized instance method. Eg - 


public class TestClass {

    public synchronized void method1() {
        // some code
        method2();
    }

    public synchronized void method2() {
        // some other code
    }

}

Here for a thread to enter either of the method has to obtain a lock on the instance (this) before entering the method. Now we are calling method 2 from method1 which is again synchronized with same instance (this). So thread will try to acquire lock again. If locks were not reentrant in nature we would have ended up in deadlock. 

Note : In Java all intrinsic locks are reentrant in nature. 

Note : Synchronization is built around an internal entity known as the intrinsic lock or monitor lock. (The API specification often refers to this entity simply as a "monitor.") Intrinsic locks play a role in both aspects of synchronization: enforcing exclusive access to an object's state and establishing happens-before relationships that are essential to visibility.Every object has an intrinsic lock associated with it. Explicit locks are introduced in Java 1.5 like semaphore, cyclic barrier etc.

Now lets see Reentant lock in Java that was introduced in Java 1.5.

ReentrantLock  in Java

As per Java doc

A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities like -
  •  It takes a fairness parameter. When set true, under contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular access order. Programs using fair locks accessed by many threads may display lower overall throughput (i.e., are slower; often much slower) than those using the default setting, but have smaller variances in times to obtain locks and guarantee lack of starvation. Note however, that fairness of locks does not guarantee fairness of thread scheduling. Thus, one of many threads using a fair lock may obtain it multiple times in succession while other active threads are not progressing and not currently holding the lock. Also note that the untimed tryLock method does not honor the fairness setting. It will succeed if the lock is available even if other threads are waiting - ReentrantLock(boolean fair)
  • It provides tryLock() method which acquires lock only if it is not held by other threads. We can also use timeout with this method which means thread will time out of waiting if lock is not acquired till the timeout value. This is better than intrinsic locks where you have to wait indefinitely.
  • It also provides facility to interrupt thread while waiting using.  ReentrantLock provides a method called lockInterruptibly() [Acquires the lock unless the current thread is interrupted.], which can be used to interrupt thread when it is waiting for lock.
  • Lastly it also provides functionality to get list of all threads waiting for the lock - getWaitingThreads(Condition condition)
    (Returns a collection containing those threads that may be waiting on the given condition associated with this lock).

NOTE : This lock supports a maximum of 2147483647 recursive locks by the same thread. Attempts to exceed this limit result in Error throws from locking methods.

Example -


 class Test {
   ReentrantLock reLock = new ReentrantLock();
   // ...

   public void m() {
     reLock.lock();  // block until condition holds
     try {
       // ... method body
     } finally {
       reLock.unlock()
     }
   }
 }


Working

Also, the way reentrancy is achieved is by maintaining a counter for number of locks acquired and owner of the lock. If the count is 0 and no owner is associated to it, means lock is not held by any thread. When a thread acquires the lock, JVM records the owner and sets the counter to 0.If same thread tries to acquire the lock again the counter is incremented, and when the owning thread exist synchronized block counter is decremented. When count reaches 0 again lock is released.


Most generic example are Segments used in ConcurrenHashMap. Each segment is essentially a ReentrantLock that allows only single thread to access that part of the map. You can refer to the link above to see how it works. Adding relevant snippet here -

static final class Segment<K,V> extends ReentrantLock implements Serializable {

    //The number of elements in this segment's region.
    transient volatile int count;
    //The per-segment table. 
    transient volatile HashEntry<K,V>[] table;
}

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        //logic to store data in map
    } finally {
        unlock();
    }
}


NOTE : ReentrantLock was introduced since Java 5.

Related Links

Sunday, 10 April 2016

ThreadLocal class in Java

Background

Java concurrency and multi-threading is a wide topic. If you are coding for a multi threaded application you need to take care of thread safety. Not all objects are thread safe. Any by thread safety I mean that if a thread is working on some variable and context switch happens and some other thread alters the value of the variable, when 1st thread comes back it will see different value which ultimately will led to inconsistent state (race condition).

One way to ensure thread safety is by Synchronization. In this basically you need to acquire lock before entering a critical region. Other threads have to wait until current that that holds the lock releases it. 

Another way to ensure thread safety is by using ThreadLocal variable. This basically ensures each thread has it's own local copy of this variable and cannot access copy held by other threads. Hence the case of race or inconsistency will never arise. Lets see this in more details.

Example

Consider following example -



public class ThreadLocalDemo {
    
    public static void main(String args[]) throws InterruptedException {    
        MYRunnable myRunnable  = new MYRunnable();
        Thread t1 = new Thread(myRunnable);
        Thread t2 = new Thread(myRunnable);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        
    }
}


class MYRunnable implements Runnable {


    ThreadLocal<String> myThreadLocal = new ThreadLocal<String>(){
        protected String initialValue() {
            return "InitialValue";
        };
    };
    
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " : Before : " + myThreadLocal.get());
        myThreadLocal.set("NewValue1");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " : After 1 : " + myThreadLocal.get());
        myThreadLocal.set("NewValue2");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " : After 2 : " + myThreadLocal.get());
        myThreadLocal.set("NewValue3");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " : After 3 : " + myThreadLocal.get());
    }
    
}

Output is - 

Thread-1 : Before : InitialValue
Thread-0 : Before : InitialValue
Thread-0 : After 1 : NewValue1
Thread-1 : After 1 : NewValue1
Thread-0 : After 2 : NewValue2
Thread-1 : After 2 : NewValue2
Thread-0 : After 3 : NewValue3
Thread-1 : After 3 : NewValue3

As you can see values are consistent across threads. There are no race conditions.


More Info


  • Good example of ThreadLocal is SimpleDateFormat. Since SimpleDateFormat is not thread safe, having a global formatter may not work (different threads may alter it's value and may led to inconsistent state) but having per Thread formatter will certainly work.
  • What I have generally seen is ThreadLocal variable being declared static and its value being set and retrieved per thread.
  • ThreadLocal variable are more like local variable (which are only accessible in the block they are declared). ThreadLocal can only be accessed in same thread.

NOTE 

  • You need to be very careful about cleaning up any ThreadLocals you get() or set() by using the ThreadLocal's remove() method.
  • If you do not clean up when you're done, any references it holds to classes loaded as part of a deployed webapp will remain in the permanent heap and will never get garbage collected. Redeploying/undeploying the webapp will not clean up each Thread's reference to your webapp's class(es) since the Thread is not something owned by your webapp. Each successive deployment will create a new instance of the class which will never be garbage collected.
  • So if you use ThreadLocal to store some object instance there is a high risk to have the object stored in the thread local never garbaged when your app runs inside an app server like WebLogic Server, which manage a pool of working thread - even when the class that created this ThreadLocal instance is garbage collected.


Related Links

Saturday, 29 August 2015

Difference between StringBuilder and StringBuffer in Java

Background

In Java String class is immutable. So each concatenation or substring operation yields a new String.

Yes we all have heard String concatenation is bad to create long Strings. Concatenating large number of String literals will fill up the permgen area (where String pool resides) faster. One of the reason why SLF4J is preferred over LOG4J  . Incase you need to concatenate String literals you can use StringBuilder or StringBuffer instance instead. You can just keep appending your String literals to this instance and finally call toString() to get a single String instance. In this post we will see the difference between StringBuilder and StringBuffer.

StringBuilder and StringBuffer in Java

Their usages are quite simple.

        StringBuffer sBuffer = new StringBuffer("abc");
        sBuffer.append("pqr");
        System.out.println(sBuffer.toString());
        
        StringBuilder sBuilder = new StringBuilder("abc");
        sBuilder.append("pqr");
        System.out.println(sBuilder.toString());



As you can see the usage is similar but their difference is very important which we will see now. Just a note no new strings are created when append is called. Internally an array is maintained and String characters are appended to it.

Difference between StringBuilder and StringBuffer in Java

The most important difference between them is 
  • StringBuffer is synchronized, StringBuilder is not.
Hence unless you have a multithreaded scenario to deal with always go for  StringBuilder and not StringBuffer. Even if you have multithread situation you can use a synchronized block around StringBuilder.

 You can say - "StringBuilder is intended as a drop in replacement for StringBuffer where synchronisation is not required"

Quoting the API docs for StringBuilder -

"A mutable sequence of characters. This class provides an API compatible with StringBuffer, but with no guarantee of synchronization. This class is designed for use as a drop-in replacement for StringBuffer in places where the string buffer was being used by a single thread (as is generally the case). Where possible, it is recommended that this class be used in preference to StringBuffer as it will be faster under most implementations."

A simple test for performance would be -

public class HelloWorld {
    public static void main(String args[]) throws IOException
    {        
        int limit = 1000000;
        long currTime;
        long timeTaken;

        {
            StringBuffer sb = new StringBuffer();
            currTime = System.currentTimeMillis();
            for (int i = limit; i --> 0 ;) {
                sb.append("");
            }
            timeTaken = System.currentTimeMillis() - currTime;
            System.out.println("StringBuffer Time : " + timeTaken);
        }
        {
            StringBuilder sb = new StringBuilder();
            currTime = System.currentTimeMillis();
            for (int i = limit; i --> 0 ;) {
                sb.append("");
            }
            timeTaken = System.currentTimeMillis() - currTime;
            System.out.println("StringBuilder Time : " + timeTaken);
        }   
    }
}


and this prints (one execution) -

StringBuffer Time : 29
StringBuilder Time : 6



Summary

So to summarize always use StringBuilder and not StringBuffer.

Related Links


Tuesday, 14 January 2014

Race Condition, Synchronization, atomic operations and Volatile keyword.

In this times where multi-threading is an essential feature of almost all systems it is very important for the programmers to handle race conditions. All the concepts mentioned in the title of this post - Race Condition, Synchronization, atomic operations and Volatile keyword are very much related to each other and understanding them together will give you a better and bigger picture of Concurrency.

Race Condition

Consider a very simple increment(++) operation. There is a common misconception that incrementing is an atomic operation. If you are wondering what atomic operations are then be patient. We will discuss it in a more detail later. For now you can understand atomic operation as an operation that takes only one CPU cycle.

But is is not. Incrementing is not an atomic operation. If you think a bit deeper you can imagine increment operation as sequence of following operations.
  1. Read the value from the memory.
  2. add one to it.
  3. Write the changed value back to the memory.
Individual operations may or may or may not be atomic in nature(Will cover this in volatile section). But for our current discussion lets say the counter is of type int  and read/writes for it are atomic. That means as far as our discussion on race condition using increment is considered each subdivided operations mentions above happen in a single CPU cycle.

Now lets see where the problem arises.Lets say we have a single counter instance and it's value is 17. Lets say you have two thread T1 and T2. Both perform increment operation on the same counter instance so that the result we expect at the end is 19. Lets say T1 reads the value of the counter which is 17 and then context switch happens. Then T2 reads the value which is again 17. Now lets say T2 will increment the value to 18 and store it back in the memory. Now the T1 thread again increments the value it has(17) so that the result is 18 and stores it back in the memory which overwrites the T2 threads value. So at the end your value is 18 when you expected it to be 19.  This problem is called race condition. You can visualize the problem with following diagram -


So yes this a problem that every programmer faces while programming in multi-threaded environment. So what is the way out? One could say make an machine level increment operation that happens in a single CPU cycle(or in other words make the increment operation atomic). But we cannot make those machine level changes can we? There are other alternatives or work a rounds that will help us solve the problem. And the first one of it is called - Synchronization.

Synchronization

To avoid race conditions we can use synchronization. By synchronizing a part of code we ensure that only one thread can process that part of the code. In java either you can have synchronized methods or blocks.

Now even these methods or blocks can be further categorized into - 
  1. instance methods/blocks
  2. static/class methods/blocks
To understand the difference between above two it is essential to understand how synchronization mechanism works internally. Java is an Object oriented programming language and everything in it is essentially an Object. Each Object is associated with a monitor. Synchronization involves e thread getting a lock over this monitor. If one thread has a lock over the monitor other thread cannot acquire the lock and consequently cannot access the area governed by that monitor.

Now for instance methods lock is acquired for monitors of the instance itself. So for example if you have a getData()  method of the Employee class then the lock obtained is on the individual Employee instance.

On the other hand for static methods lock obtained is over Class instance of the Employee instance. You can get this Class object by using ClassName.class or classInstance.getClass() method. And this Class instance is same for all the individual Employee instances.

Refer : Interview Question #13 synchronization on static function/method.

So for above counter problem we can synchronize the increment operation with a synchronized function or a block. Eg.

    public synchronized void incrementCounter(){
        counter++;
    }


This involves obtaining lock on the instance or this and then increment. So as long as one thread is carrying out an increment operation no other thread can enter this function. Analogous synchronized block would be

    public void incrementCounter(){
        synchronized (this) {
            counter++;
        }
    }


That is all about the basics of synchronization. We may go into more detailed discussion in another post. But the basic concept of what is synchronization and why is it used should be clear by now.

Note :

Java synchronized keyword is re-entrant in nature it means if a java synchronized method calls another synchronized method which requires same lock then current thread which is holding lock can enter into that method without acquiring lock.(More on Synchronization)


Synchronization is an alternative for operations that are not atomic. So now lets see what atomic operations are.

Atomic operations

As mentioned previously each higher level language operation may require one or more CPU cycles to be completed. It is very much possible that before all the cycles needed for a particular operation are completed there may be context switch and some other process or thread might take control of the CPU resulting in race condition we discussed above. Though we cannot actually manipulate processor instructions there are work a rounds to make an operation atomic.

If you have encountered this context before you must have read -

"The Java language specification guarantees that reading or writing a variable is an atomic operation(unless the variable is of type long or double). Operations variables of type long or double are only atomic if they declared with the volatile keyword."

Even if not lets see what it means. It means read and writes of primitive variables are atomic in nature. Exception to this is long and double.

So why the exception?
Answer : It's not atomic because it's a multiple-step operation at the machine code level. That is, longs and doubles are longer than the processor's word length. So on a machine where processor's word length is greater than the long/double size it is possible for the corresponding read write operations to be atomic.

Note :   Also note it says only read and write. So do not confuse it with any other operation like incrementing, As mentioned above increment operation itself is composed of 3 sub operations listed above.

As per JLS

For the purposes of the Java programming language memory model, a single write to a non-volatile long or double value is treated as two separate writes: one to each 32-bit half. This can result in a situation where a thread sees the first 32 bits of a 64-bit value from one write, and the second 32 bits from another write.

Writes and reads of volatile long and double values are always atomic.

Writes to and reads of references are always atomic, regardless of whether they are implemented as 32-bit or 64-bit values. 


So another way to make read/write for long/double is to declare them volatile. I have just mentioned it here since it comes under atomic title but will discuss it more in next topic - Volatile variables.

Java specially provides atomic classes for purposes like this. For eg we have AtomicInteger or AtomicLong that provides methods like getAndDecrement(), getAndIncrement() and getAndSet() which are atomic.

The AtomicInteger class uses CAS (compare-and-swap) low-level CPU operations (no synchronization needed!) They allow you to modify particular variable only if the present value is equal to something else (and return it it succeed). So when you execute getAndIncrement() it actually runs in a loop (simplified real implementation):

int current;
do {
  current = get();
} while(!compareAndSet(current, current + 1)


Volatile keyword

Synchronization solves the problem faced due to race conditions but there is one another problem that we have missed and that is -  visibility.

Consider following code


public synchronized Singleton getInstance(){
if(_instance == null){   //race condition if two threads sees _instance= null
_instance = new Singleton();
}
}


 Here synchronization makes sure that only one thread enters the function and no other thread is allowed to access it until the thread which has the lock over it's monitor completes it's execution. But what about the variable visibility?

Thread T1 lets say identifies that  _instance is null. It acquires lock and creates new object but before the it comes out of the function lets say there is a context switch. Though new variable is created some other thread in some other function will still see _instance as null . This is because each thread caches the value of a variable and the synchronization among threads happen only when the synchronized method or block is completely executed. To overcome this problem we use volatile keyword.

Volatile keyword in Java guarantees that value of volatile variable will always be read from main memory and not from Thread's local cache thus solving the visibility issue.

Now lets come to the Long and Double issue and how making it volatile make it's read and write atomic. So there is this another concept  - happens before relationship that volatile keyword follows which means if there is a write operation with subsequent reads then reads will be processed only after write is successful.

Useful Links

This post is meant to clear the basic concepts of concurrency in Java and give you a bigger picture of how above topics are correlated. Maybe we will have individual posts to learn them individually. But you can refer to following links. They provide quite useful information.
I would recommend read following book for a good exposure on Java concurrency -
  • Java concurrency in practice - Book by Brian Goetz

t> UA-39527780-1 back to top