Distributed Locks
Purpose
The purpose of this document is to implement distributed locking across multiple JVMs using spring integration.
Use Case
Suppose you have an IP to its State table.
| IP (PK) | state | status |
| ------------------------ | ----- | --------------- |
| 161.185.160.93 | NY | completed |
| 140.241.27.22 | MA | completed |
| 209.205.209.130 | | new |
| 98.109.27.225 | | new |
Multiple instances read and write to this table. They query by IP and insert a row with IP and new status when it doesn't exist.
Slowly the new rows pile up and someone needs to assign the state for the new IPs.
Who will do it?
Choices:
- An instance not part of this cluster.
- Pros -> no need for locks.
- Cons -> needs to be constantly up, read, and write logic live in different repositories, prone to mistakes.
- One instance of this cluster.
- Pros -> read and write logic live in the same repo that makes it easier to read.
- Cons -> ensure only one instance grabs the lock.
How to ensure only one instance grabs the lock?
Enter spring integration!
We need to do three things:
- Request the lock.
- Announce to other instances on acquiring the lock (using a shared DB).
- Work on your task and release the lock.
1 needs to be done by the user.
2 is done by spring-integration <- can use redis, jdbc, or zookeeper. This post uses jdbc.
3 needs to be done by the user.
Let's look at some code
User code: Add the below inside a @Scheduled(fixedRate = 10000)) annotated method.
//PART 1 and 2
Lock lock = registry.obtain("NEW");
boolean acquired = lock.tryLock(1, TimeUnit.SECONDS);
if (acquired) {
//PART 3
try {
LOG.info("Acquired Lock!");
List<IpTable> l = ipTableRepository.findByStatus("NEW");
for (IpTable i : l) {
LOG.info(i.toString());
// call third party service to update state and status
}
} catch (Exception e) {
ReflectionUtils.rethrowRuntimeException(e);
} finally {
lock.unlock();
LOG.info("Released Lock!");
}
} else {
LOG.info("No Lock!");
}
What is “registry” on line 2?
Here the registry is JdbcLockRegistry. It is a wrapper around any JDBC technology you supply spring integration to store lock information.
@Bean
DefaultLockRepository defaultLockRepository(DataSource dataSource) {
return new DefaultLockRepository(dataSource);
}
@Bean
JdbcLockRegistry jdbcLockRegistry(LockRepository lockRepository) {
return new JdbcLockRegistry(lockRepository);
}
What is ipTableRepository on line 8?
Table housing IP, state, and status.
@Repository
public interface IpTableRepository extends MongoRepository<IpTable, String> {
List<IpTable> findByStatus(String status);
}
IpTable?
@Document
public class IpTable {
@Id
private String id;
private String ip;
private String state;
private String status;
// getters, setters & tostring
}
application.properties
spring.datasource.url=jdbc:postgresql://localhost:5432/postgres
spring.datasource.username=postgres
spring.datasource.password=postgres
spring.jpa.hibernate.ddl-auto=create
server.port=0
spring.main.allow-bean-definition-overriding=true
spring.data.mongodb.database=test
spring.data.mongodb.port=27017
spring.data.mongodb.host=localhost
docker-compose.yml file?
version: "3.9"
services:
db:
image: postgres
volumes:
- ./data/db:/var/lib/postgresql/data
environment:
- POSTGRES_DB=postgres
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- '5432:5432'
mongo:
image: mongo
#restart: always
ports:
- 27017:27017
^ docker-compose up
Log into postgresql and insert INT_LOCK table. Spring integration needs it.
psql -h localhost -p 5432 -U postgres -W
//postgres is the password
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
Log into mongo (robo-mongo) and insert fake data.
db.getCollection('ipTable').insert( { ip: "161.185.160.93", state: "NY", status: "COMPLETED" } )
db.getCollection('ipTable').createIndex( { ip: 1 } )
db.getCollection('ipTable').insert( { ip: "140.241.27.22", state: "MA", status: "COMPLETED" } )
db.getCollection('ipTable').insert( { ip: "209.205.209.130", state: "", status: "NEW" } )
db.getCollection('ipTable').insert( { ip: "98.109.27.225", state: "", status: "NEW" } )
Finally, we are done with the setup.
Let's run it :)
To run multiple JVMs, please check the “Allow parallel run” on “Run/Debug Configurations” on Intellij.
Iteration1:
Run it twice and you'll notice both the JVMs keep acquiring and releasing the lock.
To see one of the JVMs holding a lock while the other one doesn't get it, add a Thread.sleep(8000); after line 12.
Iteration2: With sleep
JVM1
2021-07-30 08:05:22.511 INFO 87731 --- [ scheduling-1] c.e.d.work.One : Acquired Lock!
2021-07-30 08:05:22.514 INFO 87731 --- [ scheduling-1] c.e.d.work.One : IpTable[id='61028b8966c5bcea54ac1850', ip='209.205.209.130', state='', status='NEW']
2021-07-30 08:05:22.514 INFO 87731 --- [ scheduling-1] c.e.d.work.One : IpTable[id='61028b8966c5bcea54ac1851', ip='98.109.27.225', state='', status='NEW']
2021-07-30 08:05:30.526 INFO 87731 --- [ scheduling-1] c.e.d.work.One : Released Lock!
2021-07-30 08:05:32.514 INFO 87731 --- [ scheduling-1] c.e.d.work.One : Acquired Lock!
2021-07-30 08:05:32.518 INFO 87731 --- [ scheduling-1] c.e.d.work.One : IpTable[id='61028b8966c5bcea54ac1850', ip='209.205.209.130', state='', status='NEW']
2021-07-30 08:05:32.518 INFO 87731 --- [ scheduling-1] c.e.d.work.One : IpTable[id='61028b8966c5bcea54ac1851', ip='98.109.27.225', state='', status='NEW']
2021-07-30 08:05:40.531 INFO 87731 --- [ scheduling-1] c.e.d.work.One : Released Lock!
JVM2
2021-07-30 08:05:28.573 INFO 87733 --- [ scheduling-1] c.e.d.work.One : No Lock!
2021-07-30 08:05:38.574 INFO 87733 --- [ scheduling-1] c.e.d.work.One : No Lock!
2021-07-30 08:05:48.575 INFO 87733 --- [ scheduling-1] c.e.d.work.One : No Lock!
JVM1 acquired the lock at 8:05:22, held it until releasing at 8:05:30.
JVM2 tried to acquire the lock at 8:05:28, but couldn't get it.
Hence, only one JVM can acquire a lock at a time.
postgres=# select * from INT_LOCK;
lock_key | region | client_id | created_date
--------------------------------------+---------+--------------------------------------+-------------------------
24d459a8-1449-3721-8c8f-9a86c2913034 | DEFAULT | 1651d9ff-1889-4ac2-90f8-c5118b011b88 | 2021-07-30 08:05:22.507
(1 row)
Conclusion
We learned how to implement distributed locking across multiple JVMs.
Let me know if you have any questions @anirudhonezero.
References
https://www.youtube.com/watch?v=firwCHbC7-c
Gotchas
What happens if the lock holding JVM dies?
In my observation, when I stopped the JVM on IntelliJ, it always releases the lock but this might not always be true, what happens in a power cut?
Logically, when an instance acquires a lock, it should only grab it for a certain period of time, for example, 5 mins. It can be implemented using the “lockedUntil” field in INT_LOCK. Maybe spring integration will add it in the future.
Another project called Shedlock does the above, give it a try.
{
"_id" : "AnirudhTestSchedulerLock",
"lockUntil" : ISODate("2021-07-28T19:53:16.613Z"),
"lockedAt" : ISODate("2021-07-28T19:52:51.613Z"),
"lockedBy" : "ip-192-168-0-5.ec2.internal"
}
Hope you enjoyed this newsletter!