Distributed Locks

Jul 31, 2021 spring jvm locks

Purpose

The purpose of this document is to implement distributed locking across multiple JVMs using spring integration.

Use Case

Suppose you have an IP to its State table.

|          IP (PK)         | state |      status     |
| ------------------------ | ----- | --------------- |
| 161.185.160.93           |  NY   | completed       |
| 140.241.27.22            |  MA   | completed       |
| 209.205.209.130          |       | new             |
| 98.109.27.225            |       | new             |

Multiple instances read and write to this table. They query by IP and insert a row with IP and new status when it doesn't exist.

Slowly the new rows pile up and someone needs to assign the state for the new IPs.

Who will do it?

Choices:

  1. An instance not part of this cluster.
    1. Pros -> no need for locks.
    2. Cons -> needs to be constantly up, read, and write logic live in different repositories, prone to mistakes.
  2. One instance of this cluster.
    1. Pros -> read and write logic live in the same repo that makes it easier to read.
    2. Cons -> ensure only one instance grabs the lock.

How to ensure only one instance grabs the lock?

Enter spring integration!

We need to do three things:

  1. Request the lock.
  2. Announce to other instances on acquiring the lock (using a shared DB).
  3. Work on your task and release the lock.

1 needs to be done by the user.

2 is done by spring-integration <- can use redis, jdbc, or zookeeper. This post uses jdbc.

3 needs to be done by the user.

Let's look at some code

User code: Add the below inside a @Scheduled(fixedRate = 10000)) annotated method.

//PART 1 and 2
Lock lock = registry.obtain("NEW");
boolean acquired = lock.tryLock(1, TimeUnit.SECONDS);
if (acquired) {
    //PART 3
    try {
        LOG.info("Acquired Lock!");
        List<IpTable> l = ipTableRepository.findByStatus("NEW");
        for (IpTable i : l) {
             LOG.info(i.toString());
             // call third party service to update state and status
        }
    } catch (Exception e) {
        ReflectionUtils.rethrowRuntimeException(e);
    } finally {
        lock.unlock();
        LOG.info("Released Lock!");
    }
} else {
    LOG.info("No Lock!");
}

What is “registry” on line 2?

Here the registry is JdbcLockRegistry. It is a wrapper around any JDBC technology you supply spring integration to store lock information.

@Bean
DefaultLockRepository defaultLockRepository(DataSource dataSource) {
    return new DefaultLockRepository(dataSource);
}

@Bean
JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository) {
    return new JdbcLockRegistry(lockRepository);
}

What is ipTableRepository on line 8?

Table housing IP, state, and status.

@Repository
public interface IpTableRepository extends MongoRepository<IpTable, String> {
    List<IpTable> findByStatus(String status);
}

IpTable?

@Document
public class IpTable {

    @Id
    private String id;
    private String ip;
    private String state;
    private String status;

    // getters, setters & tostring
}

application.properties

spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.datasource.username=postgres
spring.datasource.password=postgres
spring.jpa.hibernate.ddl-auto=create
server.port=0
spring.main.allow-bean-definition-overriding=true


spring.data.mongodb.database=test
spring.data.mongodb.port=27017
spring.data.mongodb.host=localhost

docker-compose.yml file?

version: "3.9"

services:
  db:
    image: postgres
    volumes:
      - ./data/db:/var/lib/postgresql/data
    environment:
      - POSTGRES_DB=postgres
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
    ports:
      - '5432:5432'
  mongo:
    image: mongo
    #restart: always
    ports:
      - 27017:27017

^ docker-compose up

Log into postgresql and insert INT_LOCK table. Spring integration needs it.

psql -h localhost -p 5432 -U postgres -W
//postgres is the password
CREATE TABLE INT_LOCK  (
        LOCK_KEY CHAR(36) NOT NULL,
        REGION VARCHAR(100) NOT NULL,
        CLIENT_ID CHAR(36),
        CREATED_DATE TIMESTAMP NOT NULL,
        constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);

Log into mongo (robo-mongo) and insert fake data.

db.getCollection('ipTable').insert( { ip: "161.185.160.93", state: "NY", status: "COMPLETED" } )
db.getCollection('ipTable').createIndex( { ip: 1 } )
db.getCollection('ipTable').insert( { ip: "140.241.27.22", state: "MA", status: "COMPLETED" } )
db.getCollection('ipTable').insert( { ip: "209.205.209.130", state: "", status: "NEW" } )
db.getCollection('ipTable').insert( { ip: "98.109.27.225", state: "", status: "NEW" } )

Finally, we are done with the setup.

Let's run it :)

To run multiple JVMs, please check the “Allow parallel run” on “Run/Debug Configurations” on Intellij.

Iteration1:

Run it twice and you'll notice both the JVMs keep acquiring and releasing the lock.

To see one of the JVMs holding a lock while the other one doesn't get it, add a Thread.sleep(8000); after line 12.

Iteration2: With sleep

JVM1

2021-07-30 08:05:22.511  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : Acquired Lock!
2021-07-30 08:05:22.514  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : IpTable[id='61028b8966c5bcea54ac1850', ip='209.205.209.130', state='', status='NEW']
2021-07-30 08:05:22.514  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : IpTable[id='61028b8966c5bcea54ac1851', ip='98.109.27.225', state='', status='NEW']
2021-07-30 08:05:30.526  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : Released Lock!
2021-07-30 08:05:32.514  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : Acquired Lock!
2021-07-30 08:05:32.518  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : IpTable[id='61028b8966c5bcea54ac1850', ip='209.205.209.130', state='', status='NEW']
2021-07-30 08:05:32.518  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : IpTable[id='61028b8966c5bcea54ac1851', ip='98.109.27.225', state='', status='NEW']
2021-07-30 08:05:40.531  INFO 87731 --- [   scheduling-1] c.e.d.work.One                           : Released Lock!

JVM2

2021-07-30 08:05:28.573  INFO 87733 --- [   scheduling-1] c.e.d.work.One                           : No Lock!
2021-07-30 08:05:38.574  INFO 87733 --- [   scheduling-1] c.e.d.work.One                           : No Lock!
2021-07-30 08:05:48.575  INFO 87733 --- [   scheduling-1] c.e.d.work.One                           : No Lock!

JVM1 acquired the lock at 8:05:22, held it until releasing at 8:05:30.

JVM2 tried to acquire the lock at 8:05:28, but couldn't get it.

Hence, only one JVM can acquire a lock at a time.

postgres=# select * from INT_LOCK;
               lock_key               | region  |              client_id               |      created_date
--------------------------------------+---------+--------------------------------------+-------------------------
 24d459a8-1449-3721-8c8f-9a86c2913034 | DEFAULT | 1651d9ff-1889-4ac2-90f8-c5118b011b88 | 2021-07-30 08:05:22.507
(1 row)

Conclusion

We learned how to implement distributed locking across multiple JVMs.

Let me know if you have any questions @anirudhonezero.

References

https://www.youtube.com/watch?v=firwCHbC7-c

Gotchas

What happens if the lock holding JVM dies?

In my observation, when I stopped the JVM on IntelliJ, it always releases the lock but this might not always be true, what happens in a power cut?

Logically, when an instance acquires a lock, it should only grab it for a certain period of time, for example, 5 mins. It can be implemented using the “lockedUntil” field in INT_LOCK. Maybe spring integration will add it in the future.

Another project called Shedlock does the above, give it a try.

{
    "_id" : "AnirudhTestSchedulerLock",
    "lockUntil" : ISODate("2021-07-28T19:53:16.613Z"),
    "lockedAt" : ISODate("2021-07-28T19:52:51.613Z"),
    "lockedBy" : "ip-192-168-0-5.ec2.internal"
}

Hope you enjoyed this newsletter!