Spring Data JPA開啟批量更新時樂觀鎖失效問題的解決方法
樂觀鎖機制
什么是樂觀鎖?
樂觀鎖的基本思想是,認為在大多數情況下,數據訪問不會導致沖突。因此,樂觀鎖允許多個事務同時讀取和修改相同的數據,而不進行顯式的鎖定。在提交事務之前,會檢查是否有其他事務對該數據進行了修改。如果沒有沖突,則提交成功;如果發(fā)現沖突,就需要回滾并重新嘗試。
樂觀鎖通常使用版本號或時間戳來實現。每個數據項都會包含一個表示當前版本的標識符。在讀取數據時,會將版本標識符保存下來。在提交更新時,會檢查數據的當前版本是否與保存的版本匹配。如果匹配,則更新成功;否則,表示數據已被其他事務修改,需要處理沖突。
樂觀鎖適用于讀操作頻率較高、寫操作沖突較少的場景。它減少了鎖的使用,提高了并發(fā)性能,但需要處理沖突和重試的情況。
樂觀鎖是一種廣義的思想,不是某一框架或語言特有的。
樂觀鎖的優(yōu)缺點
優(yōu)點
- 增強吞吐量:由于在事務持續(xù)時間的大部分時間內沒有持有鎖,因此等待時間最少,吞吐量也是最?的。
- 最小化死鎖:死鎖是一種事務無限期地等待其他人鎖定的資源的情況,這種情況的可能性要小得多,因為數據不會長時間鎖定。
- 更好的可擴展性:隨著分布式系統(tǒng)和微服務架構的興起,樂觀鎖在確保系統(tǒng)能夠有效擴展而無需管理復雜鎖機制的開銷方面發(fā)揮著關鍵作用。
缺點
- 沖突管理開銷:在沖突頻繁的場景中,管理和解決沖突可能會占用大量資源。
- 復雜性:實現樂觀鎖需要經過深思熟慮的設計,特別是在處理失敗的事務時。
- 過時數據的可能性:由于數據在讀取時未鎖定,因此事務可能會使用過時或過時的數據,如果管理不正確,可能會導致邏輯錯誤或不一致。
JPA-樂觀鎖
概述
JPA(Java Persistence API)協議對樂觀鎖的操作做了規(guī)定:通過指定@Version字段對數據增加版本號控制,進?在更新的時候判斷版本號是否有變化。如果版本沒有變化則更新成功;如果版本有變化,就會更新失敗并拋出“OptimisticLockException”異常。我們? SQL 表示?下樂觀鎖的做法,代碼如下:
SELECT uid, name, version FROM user WHERE id = 1; UPDATE user SET name = 'jack', version = version + 1 WHERE id = 1 AND version = 1;
假設本次查詢的version=1,在更新操作時,只要version與上一個版本相同,就會更新成功,并且不會出現互相覆蓋的問題,保證了數據的原?性。
實現方法
JPA 協議規(guī)定,想要實現樂觀鎖,可以通過@Version注解標注在某個字段上?,而此字段需要是可以持久化到DB的字段,并且只?持如下四種類型:
int或Integershort或Shortlong或Longjava.sql.Timestamp
我比較推薦使用Integer類型的字段,語義比較清晰、簡單。
@Version的作用
該@Version注解用于啟用實體上的樂觀鎖,確保數據庫中的數據更新不會出現并發(fā)修改問題。當實體中的某個字段標記為@Version時,JPA 將使用該字段來跟蹤更改并確保一次只有一個事務可以更新特定行。
注意:Spring Data JPA ??有兩個@Version注解,請使?@javax.persistence.Version,?不是@org.springframework.data.annotation.Version。
它是如何工作的?
每個用注解標記的實體都@Version將由 JPA 跟蹤其版本。這是基本機制:
- 初始化:當實體第一次被持久化(保存到數據庫)時,版本字段(通常是整數或時間戳)被設置為其初始值,通常為零。
- 讀取:稍后獲取實體時,JPA 會從數據庫中檢索當前版本。
- 更新:在嘗試更新或刪除實體時,JPA 會根據實體的版本檢查數據庫中的當前版本。如果版本匹配,則操作繼續(xù),并且數據庫中的版本增加(用于更新)。
- 沖突:如果版本不匹配,則表明另一個事務同時更新了實體,導致 JPA 拋出
OptimisticLockException。
項目示例
引入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!-- 驅動 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 數據庫連接池 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> </dependency>
項目配置
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false
username: root
password: root
jpa:
database: mysql
database-platform: org.hibernate.dialect.MySQL5InnoDBDialect
show-sql: true
hibernate:
ddl-auto: update # 一般使用update
# create: 每次運行程序時,都會重新創(chuàng)建表,故而數據會丟失
# create-drop: 每次運行程序時會先創(chuàng)建表結構,然后待程序結束時清空表
# upadte: 每次運行程序,沒有表時會創(chuàng)建表,如果對象發(fā)生改變會更新表結構,原有數據不會清空,只會更新(推薦使用)
# validate: 運行程序會校驗數據與數據庫的字段類型是否相同,字段不同會報錯
# none: 禁用DDL處理
open-in-view: false
properties:
hibernate:
jdbc: # 開啟批量更新/寫入
batch_size: 50
batch_versioned_data: true
order_inserts: true
order_updates: true
實體添加@Version
User實體增加字段version,并添加注解@Version。當然,數據庫也要加上version字段。
@Entity
@Table(name = "TEST_USER")
public class User {
// ......
@Version
private Integer version;
// ......
}
創(chuàng)建UserInfoRepository
創(chuàng)建UserInfoRepository,?便進?DB操作
public interface UserInfoRepository extends JpaRepository<User, Long> {}
創(chuàng)建 UserInfoService
創(chuàng)建 UserInfoService,?來模擬Service的復雜業(yè)務邏輯。
public interface UserService {
/**
* 根據 UserId 產?的?些業(yè)務計算邏輯
*/
User calculate(Long userId);
}
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserRepository userRepository;
@Override
@Transactional
public User calculate(Long userId) {
User user = repository.getById(userId);
// 模擬復雜的業(yè)務計算邏輯耗時操作;
try {
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException ignored) {
}
user.setAge(user.getAge() + 1);
return userRepository.saveAndFlush(user);
}
}
其中,我們通過 @Transactional 開啟事務,并且在查詢?法后?模擬復雜業(yè)務邏輯,?來呈現多線程的并發(fā)問題。
測試方法
@ExtendWith(SpringExtension.class)
@DataJpaTest
@ComponentScan(basePackageClasses = UserServiceImpl.class)
class UserServiceTest {
@Autowired
private UserService userService;
@Autowired
private UserRepository userRepository;
@Test
void testVersion() {
// 加?條數據
User user1 = userRepository.save(User.builder().age(20).name("zzn").build());
// 驗證?下數據庫??的值
Assertions.assertEquals(0, user1.getVersion());
Assertions.assertEquals(20, user1.getAge());
userService.calculate(user1.getId());
// 驗證?下更新成功的值
User user2 = userRepository.getById(user1.getId());
Assertions.assertEquals(1, user2.getVersion());
Assertions.assertEquals(21, user2.getAge());
}
@SneakyThrows
@Test
@Rollback(false)
@Transactional(propagation = Propagation.NEVER)
void testVersionException() {
// 加?條數據
userRepository.save(User.builder().age(20).name("zzn").build());
// 模擬多線程執(zhí)?兩次
new Thread(() -> userService.calculate(1L)).start();
TimeUnit.SECONDS.sleep(1L);
// 如果兩個線程同時執(zhí)?會發(fā)?樂觀鎖異常;
Exception exception = Assertions.assertThrows(ObjectOptimisticLockingFailureException.class,
() -> userService.calculate(1L));
log.info("error info:", exception);
}
}
從上?的測試得到的結果中,我們執(zhí)?testVersion(),會發(fā)現在 save 的時候, Version會?動 +1,第?次初始化為 0;update 的時候也會附帶 Version 條件,我們通過下圖的 SQL,也可以看到 Version 的變化。

?當?我們調?testVersionException()測試?法的時候,利?多線程模擬兩個并發(fā)情況,會發(fā)現兩個線程同時取到了歷史數據,并在稍后都對歷史數據進?了更新。
由此你會發(fā)現,第?次測試的結果是樂觀鎖異常,更新不成功。
通過?志?會發(fā)現,兩個SQL同時更新的時候,Version是?樣的,是它導致了樂觀鎖異常。
注意:樂觀鎖異常不僅僅是同?個?法多線程才會出現的問題,我們只是為了?便測試?采?同?個?法;不同的?法、不同的項?,都有可能導致樂觀鎖異常。樂觀鎖的本質是 SQL 層?發(fā)?的,和使?的框架、技術沒有關系。
問題描述
一句廢話:正常情況下,一切正常!
運行環(huán)境
Java:1.8.0 SpringBoot:2.3.12.RELEASE Spring Data JPA:2.3.9.RELEASE Hibernate:5.4.32.Final Database Driver:ojdbc6 11.2.0.3 Database Platform:Oracle 10g
問題現象
上述代碼示例運行在MySQL數據庫上,一切正常,但是切換到Oracle數據庫時,不開啟批量更新模式時,也符合預期,但是開啟批量更新模式時,不符合預期:并發(fā)更新同一實體時,未拋出
ObjectOptimisticLockingFailureException異常。
| 數據庫類型 | 開啟批量 | 不開啟批量 |
|---|---|---|
| Oracle | 不生效 | 生效 |
| MySQL | 生效 | 生效 |
批量模式下,樂觀鎖異常棧:
Caused by: org.hibernate.StaleStateException: Batch update returned unexpected row count from update [0]; actual row count: 0; expected: 1; statement executed: update test_user set update_time=?, version=?, remark=? where user_id=? and version=? at org.hibernate.jdbc.Expectations$BasicExpectation.checkBatched(Expectations.java:67) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.jdbc.Expectations$BasicExpectation.verifyOutcome(Expectations.java:54) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.checkRowCounts(BatchingBatch.java:151) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.performExecution(BatchingBatch.java:126) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.doExecuteBatch(BatchingBatch.java:106) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.AbstractBatchImpl.execute(AbstractBatchImpl.java:148) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.executeBatch(JdbcCoordinatorImpl.java:198) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:633) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
非批量模式下,樂觀鎖異常棧:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [com.esunny.option.domain.user.User#990] at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:2649) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3492) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:3355) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3769) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.action.internal.EntityUpdateAction.execute(EntityUpdateAction.java:201) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
代碼分析
從以上兩種模式下的異常棧分析代碼路徑:
org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush
org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush
org.hibernate.internal.SessionImpl.flush
org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener
org.hibernate.event.internal.DefaultFlushEventListener.onFlush
org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions
org.hibernate.engine.spi.ActionQueue.executeActions
ActionQueue.executeActions邏輯如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java
/**
* Perform all currently queued actions.
*
* @throws HibernateException error executing queued actions.
*/
public void executeActions() throws HibernateException {
if ( hasUnresolvedEntityInsertActions() ) {
throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." );
}
for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) {
ExecutableList<?> l = listProvider.get( this );
if ( l != null && !l.isEmpty() ) {
executeActions( l );
}
}
}
/**
* Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list
*
* @param list The list of Executable elements to be performed
*
* @throws HibernateException
*/
private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException {
// todo : consider ways to improve the double iteration of Executables here:
// 1) we explicitly iterate list here to perform Executable#execute()
// 2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces.
try {
for ( E e : list ) {
try {
e.execute();
}
finally {
if( e.getBeforeTransactionCompletionProcess() != null ) {
if( beforeTransactionProcesses == null ) {
beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session );
}
beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess());
}
if( e.getAfterTransactionCompletionProcess() != null ) {
if( afterTransactionProcesses == null ) {
afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session );
}
afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess());
}
}
}
}
finally {
if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) {
// Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs.
// We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are
// unexpected.
Set<Serializable> propertySpaces = list.getQuerySpaces();
invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) );
}
}
list.clear();
session.getJdbcCoordinator().executeBatch();
}
這里在for循環(huán)里頭調用了e.execute(),同時在循環(huán)之后,finally之后調用了session.getJdbcCoordinator().executeBatch()
其中,EXECUTABLE_LISTS_MAP中的Executable包括:EntityInsertAction、EntityUpdateAction、EntityDeleteAction等。
Executable.execute邏輯如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/action/internal/EntityUpdateAction.java
@Override
public void execute() throws HibernateException {
final Serializable id = getId();
final EntityPersister persister = getPersister();
final SharedSessionContractImplementor session = getSession();
final Object instance = getInstance();
final boolean veto = preUpdate();
final SessionFactoryImplementor factory = session.getFactory();
Object previousVersion = this.previousVersion;
if ( persister.isVersionPropertyGenerated() ) {
// we need to grab the version value from the entity, otherwise
// we have issues with generated-version entities that may have
// multiple actions queued during the same flush
previousVersion = persister.getVersion( instance );
}
final Object ck;
if ( persister.canWriteToCache() ) {
final EntityDataAccess cache = persister.getCacheAccessStrategy();
ck = cache.generateCacheKey(
id,
persister,
factory,
session.getTenantIdentifier()
);
lock = cache.lockItem( session, ck, previousVersion );
}
else {
ck = null;
}
if ( !veto ) {
persister.update(
id,
state,
dirtyFields,
hasDirtyCollection,
previousState,
previousVersion,
instance,
rowId,
session
);
}
final EntityEntry entry = session.getPersistenceContextInternal().getEntry( instance );
if ( entry == null ) {
throw new AssertionFailure( "possible nonthreadsafe access to session" );
}
if ( entry.getStatus()==Status.MANAGED || persister.isVersionPropertyGenerated() ) {
// get the updated snapshot of the entity state by cloning current state;
// it is safe to copy in place, since by this time no-one else (should have)
// has a reference to the array
TypeHelper.deepCopy(
state,
persister.getPropertyTypes(),
persister.getPropertyCheckability(),
state,
session
);
if ( persister.hasUpdateGeneratedProperties() ) {
// this entity defines property generation, so process those generated
// values...
persister.processUpdateGeneratedProperties( id, instance, state, session );
if ( persister.isVersionPropertyGenerated() ) {
nextVersion = Versioning.getVersion( state, persister );
}
}
// have the entity entry doAfterTransactionCompletion post-update processing, passing it the
// update state and the new version (if one).
entry.postUpdate( instance, state, nextVersion );
}
final StatisticsImplementor statistics = factory.getStatistics();
if ( persister.canWriteToCache() ) {
if ( persister.isCacheInvalidationRequired() || entry.getStatus()!= Status.MANAGED ) {
persister.getCacheAccessStrategy().remove( session, ck);
}
else if ( session.getCacheMode().isPutEnabled() ) {
//TODO: inefficient if that cache is just going to ignore the updated state!
final CacheEntry ce = persister.buildCacheEntry( instance,state, nextVersion, getSession() );
cacheEntry = persister.getCacheEntryStructure().structure( ce );
final boolean put = cacheUpdate( persister, previousVersion, ck );
if ( put && statistics.isStatisticsEnabled() ) {
statistics.entityCachePut(
StatsHelper.INSTANCE.getRootEntityRole( persister ),
getPersister().getCacheAccessStrategy().getRegion().getName()
);
}
}
}
session.getPersistenceContextInternal().getNaturalIdHelper().manageSharedNaturalIdCrossReference(
persister,
id,
state,
previousNaturalIdValues,
CachedNaturalIdValueSource.UPDATE
);
postUpdate();
if ( statistics.isStatisticsEnabled() && !veto ) {
statistics.updateEntity( getPersister().getEntityName() );
}
}
調用了persister的update方法。
AbstractEntityPersister.update
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java
public boolean update(
final Serializable id,
final Object[] fields,
final Object[] oldFields,
final Object rowId,
final boolean[] includeProperty,
final int j,
final Object oldVersion,
final Object object,
final String sql,
final SharedSessionContractImplementor session) throws HibernateException {
final Expectation expectation = Expectations.appropriateExpectation( updateResultCheckStyles[j] );
final int jdbcBatchSizeToUse = session.getConfiguredJdbcBatchSize();
// IMPLEMENTATION NOTE: If Session#saveOrUpdate or #update is used to update an entity, then
// Hibernate does not have a database snapshot of the existing entity.
// As a result, oldFields will be null.
// Don't use a batch if oldFields == null and the jth table is optional (isNullableTable( j ),
// because there is no way to know that there is actually a row to update. If the update
// was batched in this case, the batch update would fail and there is no way to fallback to
// an insert.
final boolean useBatch =
expectation.canBeBatched() &&
isBatchable() &&
jdbcBatchSizeToUse > 1 &&
( oldFields != null || !isNullableTable( j ) );
if ( useBatch && updateBatchKey == null ) {
updateBatchKey = new BasicBatchKey(
getEntityName() + "#UPDATE",
expectation
);
}
final boolean callable = isUpdateCallable( j );
final boolean useVersion = j == 0 && isVersioned();
if ( LOG.isTraceEnabled() ) {
LOG.tracev( "Updating entity: {0}", MessageHelper.infoString( this, id, getFactory() ) );
if ( useVersion ) {
LOG.tracev( "Existing version: {0} -> New version:{1}", oldVersion, fields[getVersionProperty()] );
}
}
try {
int index = 1; // starting index
final PreparedStatement update;
if ( useBatch ) {
update = session
.getJdbcCoordinator()
.getBatch( updateBatchKey )
.getBatchStatement( sql, callable );
}
else {
update = session
.getJdbcCoordinator()
.getStatementPreparer()
.prepareStatement( sql, callable );
}
try {
index += expectation.prepare( update );
//Now write the values of fields onto the prepared statement
index = dehydrate(
id,
fields,
rowId,
includeProperty,
propertyColumnUpdateable,
j,
update,
session,
index,
true
);
// Write any appropriate versioning conditional parameters
if ( useVersion && entityMetamodel.getOptimisticLockStyle().isVersion()) {
if ( checkVersion( includeProperty ) ) {
getVersionType().nullSafeSet( update, oldVersion, index, session );
}
}
else if ( isAllOrDirtyOptLocking() && oldFields != null ) {
boolean[] versionability = getPropertyVersionability(); //TODO: is this really necessary????
boolean[] includeOldField = entityMetamodel.getOptimisticLockStyle().isAll()
? getPropertyUpdateability()
: includeProperty;
Type[] types = getPropertyTypes();
for ( int i = 0; i < entityMetamodel.getPropertySpan(); i++ ) {
boolean include = includeOldField[i] &&
isPropertyOfTable( i, j ) &&
versionability[i]; //TODO: is this really necessary????
if ( include ) {
boolean[] settable = types[i].toColumnNullness( oldFields[i], getFactory() );
types[i].nullSafeSet(
update,
oldFields[i],
index,
settable,
session
);
index += ArrayHelper.countTrue( settable );
}
}
}
if ( useBatch ) {
session.getJdbcCoordinator().getBatch( updateBatchKey ).addToBatch();
return true;
}
else {
return check(
session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update ),
id,
j,
expectation,
update,
sql
);
}
}
catch (SQLException e) {
if ( useBatch ) {
session.getJdbcCoordinator().abortBatch();
}
throw e;
}
finally {
if ( !useBatch ) {
session.getJdbcCoordinator().getResourceRegistry().release( update );
session.getJdbcCoordinator().afterStatementExecution();
}
}
}
catch (SQLException e) {
throw getFactory().getSQLExceptionHelper().convert(
e,
"could not update: " + MessageHelper.infoString( this, id, getFactory() ),
sql
);
}
}
關鍵之處:
useBatch的賦值邏輯
public boolean isBatchable() {
return optimisticLockStyle().isNone()
|| !isVersioned() && optimisticLockStyle().isVersion()
|| getFactory().getSessionFactoryOptions().isJdbcBatchVersionedData();
}
1. 配置了`spring.jpa.properties.hibernate.jdbc.batch_versioned_data`為true;
2. jdbcBatchSizeToUse > 1, 即`spring.jpa.properties.hibernate.jdbc.batch_size`大于0
- 如果useBatch為true
調用session.getJdbcCoordinator().getBatch(updateBatchKey).addToBatch();
這里的updateBatchKey為com.example.domain.User#UPDATE;此處僅是將PreparedStatement放入待執(zhí)行隊列。
之后便執(zhí)行session.getJdbcCoordinator().executeBatch()邏輯;請看BatchingBatch.performExecution
- 如果useBatch為false
調用session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update ),并調用check方法執(zhí)行檢查。此處檢查失敗,則會拋出樂觀鎖異常!
BatchingBatch.performExecution
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java
private void performExecution() {
LOG.debugf( "Executing batch size: %s", batchPosition );
try {
for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) {
try {
final PreparedStatement statement = entry.getValue();
final int[] rowCounts;
try {
getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart();
rowCounts = statement.executeBatch();
}
finally {
getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd();
}
checkRowCounts( rowCounts, statement );
}
catch ( SQLException e ) {
abortBatch();
throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() );
}
}
}
catch ( RuntimeException re ) {
LOG.unableToExecuteBatch( re.getMessage() );
throw re;
}
finally {
batchPosition = 0;
}
}
可以看到這里調用了statement.executeBatch(),并返回了int[] rowCounts;然后調用checkRowCounts( rowCounts, statement ); >Expectations#BasicExpectation.checkBatched 此處檢查失敗,則會拋出樂觀鎖異常!
問題原因
非批量模式下,檢查執(zhí)行結果是調用的checkNonBatched方法,該方法僅檢查更新條目數是否一致:
private void checkNonBatched(int rowCount, String statementSQL) {
if ( expectedRowCount > rowCount ) {
throw new StaleStateException(
"Unexpected row count: " + rowCount + "; expected: " + expectedRowCount
+ "; statement executed: " + statementSQL
);
}
if ( expectedRowCount < rowCount ) {
String msg = "Unexpected row count: " + rowCount + "; expected: " + expectedRowCount;
throw new TooManyRowsAffectedException( msg, expectedRowCount, rowCount );
}
}
批量模式下,檢查執(zhí)行結果是調用的checkBatched方法,檢查邏輯如下:
private void checkBatched(int rowCount, int batchPosition, String statementSQL) {
if ( rowCount == -2 ) {
LOG.debugf( "Success of batch update unknown: %s", batchPosition );
}
else if ( rowCount == -3 ) {
throw new BatchFailedException( "Batch update failed: " + batchPosition );
}
else {
if ( expectedRowCount > rowCount ) {
throw new StaleStateException(
"Batch update returned unexpected row count from update ["
+ batchPosition + "]; actual row count: " + rowCount
+ "; expected: " + expectedRowCount + "; statement executed: "
+ statementSQL
);
}
if ( expectedRowCount < rowCount ) {
String msg = "Batch update returned unexpected row count from update [" +
batchPosition + "]; actual row count: " + rowCount +
"; expected: " + expectedRowCount;
throw new BatchedTooManyRowsAffectedException( msg, expectedRowCount, rowCount, batchPosition );
}
}
}
問題便在于此!
int[] executeBatch() throws SQLException
返回值說明:
① 大于或等于零的數字,表示命令已成功處理,并且是更新計數,給出了數據庫中受命令影響的行數執(zhí)行;
② SUCCESS_NO_INFO ( -2)的值,表示命令處理成功,但受影響的行數未知;
③ 如果批量更新中的命令之一無法正確執(zhí)行,此方法引發(fā)BatchUpdateException,JDBC Driver可能會也可能不會繼續(xù)處理剩余的命令。但是Driver的行為是與特定的DBMS綁定的,要么總是繼續(xù)處理命令,要么從不繼續(xù)處理命令。如果驅動程序繼續(xù)處理,方法將返回EXECUTE_FAILED(-3)。
在實際的測試過程中發(fā)現:
| DB類型 | 是否可以返回實際影響行數 | 備注 |
|---|---|---|
| MySQL | 是 | |
| Oracle | 否 | 每個數組位置值均為-2 |
在Oracle的驅動中沒有實現該功能,即提交成功后不能返回影響行數,所以返回-2。
Oracle驅動源碼如下:oracle.jdbc.driver.OraclePreparedStatement#executeBatch
public int[] executeBatch() throws SQLException {
synchronized (this.connection) {
int[] arrayOfInt = new int[this.currentRank];
/* 此處省略N行代碼 */
if ((this.sqlKind != 1) && (this.sqlKind != 4)) {
for (i = 0; i < arrayOfInt.length; i++) {
arrayOfInt[i] = -2; // 關鍵看這行
}
}
this.connection.registerHeartbeat();
return arrayOfInt;
}
}
根據StackOverflow上的說法,Oracle 11g之前的版本,executeBatch方法返回的均是-2,eg.


解決方案
Hibernate對于這個問題有自己的處理辦法,就是設置一個jdbc和數據庫的連接屬性hibernate.jdbc.use_scrollable_resultset =true。
如果你想讓你的JDBC驅動從executeBatch()返回正確的行計數 , 那么將此屬性設為true(開啟這個選項通常是安全的). 同時,Hibernate將為自動版本化的數據使用批量DML. 默認值為false. eg. true | false
以上就是Spring Data JPA開啟批量更新時樂觀鎖失效問題的解決方法的詳細內容,更多關于Spring Data JPA樂觀鎖失效的資料請關注腳本之家其它相關文章!
相關文章
maven創(chuàng)建spark項目的pom.xml文件配置demo
這篇文章主要為大家介紹了maven創(chuàng)建spark項目的pom.xml文件配置demo,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-05-05
使用注解@Recover優(yōu)化丑陋的循環(huán)詳解
我們知道在實現一個功能的時候是可以使用不同的代碼來實現的,那么相應的不同實現方法的性能肯定也是有差別的,下面這篇文章主要給大家介紹了關于使用注解@Recover優(yōu)化丑陋的循環(huán)的相關資料,需要的朋友可以參考下2022-04-04
詳解SpringBoot中@PostMapping注解的用法
在SpringBoot中,我們經常需要編寫RESTful Web服務,以便于客戶端與服務器之間的通信,@PostMapping注解可以讓我們更方便地編寫POST請求處理方法,在本文中,我們將介紹@PostMapping注解的作用、原理,以及如何在SpringBoot應用程序中使用它2023-06-06
Maven在Windows中的配置以及IDE中的項目創(chuàng)建(圖文教程)
這篇文章主要介紹了Maven在Windows中的配置以及IDE中的項目創(chuàng)建(圖文教程),具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09
解決@ConfigurationProperties注解的使用及亂碼問題
這篇文章主要介紹了解決@ConfigurationProperties注解的使用及亂碼問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-10-10

