| @@ -24,9 +24,8 @@ data class BatchReleaseJobStatus( | |||||
| class DoReleaseCoordinatorService( | class DoReleaseCoordinatorService( | ||||
| private val deliveryOrderService: DeliveryOrderService | private val deliveryOrderService: DeliveryOrderService | ||||
| ) { | ) { | ||||
| // 可按机器/DB调优:CPU核数 * 2 或固定 8~12 | |||||
| private val poolSize = Runtime.getRuntime().availableProcessors().coerceAtLeast(4) * 2 | |||||
| private val executor = Executors.newFixedThreadPool(min(poolSize, 12)) | |||||
| private val poolSize = Runtime.getRuntime().availableProcessors() | |||||
| private val executor = Executors.newFixedThreadPool(min(poolSize, 4)) | |||||
| private val jobs = ConcurrentHashMap<String, BatchReleaseJobStatus>() | private val jobs = ConcurrentHashMap<String, BatchReleaseJobStatus>() | ||||
| fun startBatchReleaseAsync(ids: List<Long>, userId: Long): MessageResponse { | fun startBatchReleaseAsync(ids: List<Long>, userId: Long): MessageResponse { | ||||
| @@ -39,51 +38,55 @@ class DoReleaseCoordinatorService( | |||||
| val status = BatchReleaseJobStatus(jobId = jobId, total = ids.size) | val status = BatchReleaseJobStatus(jobId = jobId, total = ids.size) | ||||
| jobs[jobId] = status | jobs[jobId] = status | ||||
| // 可按压测调优 | |||||
| val chunkSize = 20 | |||||
| val chunks = ids.chunked(chunkSize) | |||||
| executor.submit { | executor.submit { | ||||
| try { | try { | ||||
| chunks.forEach { chunk -> | |||||
| val futures = chunk.map { id -> | |||||
| executor.submit<Unit> { | |||||
| var attempts = 0 | |||||
| while (attempts < 2) { | |||||
| try { | |||||
| val res = deliveryOrderService.releaseDeliveryOrder( | |||||
| ReleaseDoRequest(id = id, userId = userId) | |||||
| ) | |||||
| val code = res.code ?: "OK" | |||||
| // 成功码放宽(关键修复) | |||||
| if (code in setOf("SUCCESS", "OK", "PARTIAL_SUCCESS")) { | |||||
| status.success.incrementAndGet() | |||||
| } else { | |||||
| synchronized(status.failed) { | |||||
| status.failed.add(id to (res.message ?: "Release failed: $code")) | |||||
| } | |||||
| } | |||||
| // 打印以便核对统计 | |||||
| println("DEBUG release DO id=$id -> code=${res.code}, msg=${res.message}") | |||||
| break | |||||
| } catch (e: Exception) { | |||||
| attempts++ | |||||
| val msg = e.message ?: "" | |||||
| // 死锁重试一次(1213) | |||||
| if (attempts < 2 && msg.contains("Deadlock", ignoreCase = true)) { | |||||
| Thread.sleep(150) | |||||
| continue | |||||
| } | |||||
| synchronized(status.failed) { | |||||
| status.failed.add(id to (e.message ?: "Unknown error")) | |||||
| } | |||||
| break | |||||
| } | |||||
| println("📦 Starting serial batch release for ${ids.size} orders") | |||||
| ids.forEachIndexed { index, id -> | |||||
| try { | |||||
| val res = deliveryOrderService.releaseDeliveryOrder( | |||||
| ReleaseDoRequest(id = id, userId = userId) | |||||
| ) | |||||
| val code = res.code ?: "OK" | |||||
| println("🔍 DO $id -> code='$code', msg='${res.message}'") | |||||
| // 🔧 改进成功判定: | |||||
| // 1. 标准成功码 | |||||
| // 2. code是订单号格式(如TORU05PO25090110) | |||||
| // 3. 没有明确的错误消息 | |||||
| val isSuccess = code in setOf("SUCCESS", "OK", "PARTIAL_SUCCESS") || | |||||
| code.matches(Regex("TO[A-Z]{2}\\d{2}PO\\d+")) || // 订单号格式 | |||||
| (res.message == null && code.isNotEmpty()) // 无错误消息且有返回码 | |||||
| if (isSuccess) { | |||||
| status.success.incrementAndGet() | |||||
| } else { | |||||
| synchronized(status.failed) { | |||||
| status.failed.add(id to "Code: $code, Msg: ${res.message}") | |||||
| } | } | ||||
| println("⚠️ DO $id marked as failed: code='$code'") | |||||
| } | |||||
| if ((index + 1) % 50 == 0) { | |||||
| println("📊 Progress: ${index + 1}/${ids.size} (Success: ${status.success.get()}, Failed: ${status.failed.size})") | |||||
| } | |||||
| } catch (e: Exception) { | |||||
| synchronized(status.failed) { | |||||
| status.failed.add(id to (e.message ?: "Exception")) | |||||
| } | } | ||||
| println("❌ DO $id exception: ${e.javaClass.simpleName} - ${e.message}") | |||||
| } | |||||
| } | |||||
| println("✅ Batch completed: ${status.success.get()} success, ${status.failed.size} failed") | |||||
| // 打印前10个失败的原因 | |||||
| if (status.failed.isNotEmpty()) { | |||||
| println("🔴 Failed examples:") | |||||
| status.failed.take(10).forEach { (id, msg) -> | |||||
| println(" DO $id: $msg") | |||||
| } | } | ||||
| // 等待当前块全部完成,避免任务堆积 | |||||
| futures.forEach { f -> try { f.get() } catch (_: Exception) {} } | |||||
| } | } | ||||
| } finally { | } finally { | ||||
| status.running = false | status.running = false | ||||
| @@ -93,7 +96,7 @@ class DoReleaseCoordinatorService( | |||||
| return MessageResponse( | return MessageResponse( | ||||
| id = null, code = "STARTED", name = null, type = null, | id = null, code = "STARTED", name = null, type = null, | ||||
| message = "Batch release started", errorPosition = null, | |||||
| message = "Batch release started (serial mode)", errorPosition = null, | |||||
| entity = mapOf("jobId" to jobId, "total" to ids.size) | entity = mapOf("jobId" to jobId, "total" to ids.size) | ||||
| ) | ) | ||||
| } | } | ||||
| @@ -113,7 +116,7 @@ class DoReleaseCoordinatorService( | |||||
| "finished" to finished, | "finished" to finished, | ||||
| "success" to s.success.get(), | "success" to s.success.get(), | ||||
| "failedCount" to s.failed.size, | "failedCount" to s.failed.size, | ||||
| "failed" to s.failed.take(50), // 避免超大返回,可分页 | |||||
| "failed" to s.failed.take(50), | |||||
| "running" to s.running, | "running" to s.running, | ||||
| "progress" to progress, | "progress" to progress, | ||||
| "startedAt" to s.startedAt, | "startedAt" to s.startedAt, | ||||