From e2c66270989f82e5336421d7e5d88e4726bc69f9 Mon Sep 17 00:00:00 2001 From: "CANCERYS\\kw093" Date: Fri, 17 Oct 2025 11:42:13 +0800 Subject: [PATCH] update --- .../service/DoReleaseCoordinatorService.kt | 93 ++++++++++--------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/src/main/java/com/ffii/fpsms/modules/deliveryOrder/service/DoReleaseCoordinatorService.kt b/src/main/java/com/ffii/fpsms/modules/deliveryOrder/service/DoReleaseCoordinatorService.kt index e28bead..5727e5d 100644 --- a/src/main/java/com/ffii/fpsms/modules/deliveryOrder/service/DoReleaseCoordinatorService.kt +++ b/src/main/java/com/ffii/fpsms/modules/deliveryOrder/service/DoReleaseCoordinatorService.kt @@ -24,9 +24,8 @@ data class BatchReleaseJobStatus( class DoReleaseCoordinatorService( private val deliveryOrderService: DeliveryOrderService ) { - // 可按机器/DB调优:CPU核数 * 2 或固定 8~12 - private val poolSize = Runtime.getRuntime().availableProcessors().coerceAtLeast(4) * 2 - private val executor = Executors.newFixedThreadPool(min(poolSize, 12)) + private val poolSize = Runtime.getRuntime().availableProcessors() + private val executor = Executors.newFixedThreadPool(min(poolSize, 4)) private val jobs = ConcurrentHashMap() fun startBatchReleaseAsync(ids: List, userId: Long): MessageResponse { @@ -39,51 +38,55 @@ class DoReleaseCoordinatorService( val status = BatchReleaseJobStatus(jobId = jobId, total = ids.size) jobs[jobId] = status - // 可按压测调优 - val chunkSize = 20 - val chunks = ids.chunked(chunkSize) - executor.submit { try { - chunks.forEach { chunk -> - val futures = chunk.map { id -> - executor.submit { - var attempts = 0 - while (attempts < 2) { - try { - val res = deliveryOrderService.releaseDeliveryOrder( - ReleaseDoRequest(id = id, userId = userId) - ) - val code = res.code ?: "OK" - // 成功码放宽(关键修复) - if (code in setOf("SUCCESS", "OK", "PARTIAL_SUCCESS")) { - status.success.incrementAndGet() - } else { - synchronized(status.failed) { - status.failed.add(id to (res.message ?: "Release failed: $code")) - } - } - // 打印以便核对统计 - println("DEBUG release DO id=$id -> code=${res.code}, msg=${res.message}") - break - } catch (e: Exception) { - attempts++ - val msg = e.message ?: "" - // 死锁重试一次(1213) - if (attempts < 2 && msg.contains("Deadlock", ignoreCase = true)) { - Thread.sleep(150) - continue - } - synchronized(status.failed) { - status.failed.add(id to (e.message ?: "Unknown error")) - } - break - } + println("📦 Starting serial batch release for ${ids.size} orders") + + ids.forEachIndexed { index, id -> + try { + val res = deliveryOrderService.releaseDeliveryOrder( + ReleaseDoRequest(id = id, userId = userId) + ) + val code = res.code ?: "OK" + + println("🔍 DO $id -> code='$code', msg='${res.message}'") + + // 🔧 改进成功判定: + // 1. 标准成功码 + // 2. code是订单号格式(如TORU05PO25090110) + // 3. 没有明确的错误消息 + val isSuccess = code in setOf("SUCCESS", "OK", "PARTIAL_SUCCESS") || + code.matches(Regex("TO[A-Z]{2}\\d{2}PO\\d+")) || // 订单号格式 + (res.message == null && code.isNotEmpty()) // 无错误消息且有返回码 + + if (isSuccess) { + status.success.incrementAndGet() + } else { + synchronized(status.failed) { + status.failed.add(id to "Code: $code, Msg: ${res.message}") } + println("⚠️ DO $id marked as failed: code='$code'") + } + + if ((index + 1) % 50 == 0) { + println("📊 Progress: ${index + 1}/${ids.size} (Success: ${status.success.get()}, Failed: ${status.failed.size})") + } + } catch (e: Exception) { + synchronized(status.failed) { + status.failed.add(id to (e.message ?: "Exception")) } + println("❌ DO $id exception: ${e.javaClass.simpleName} - ${e.message}") + } + } + + println("✅ Batch completed: ${status.success.get()} success, ${status.failed.size} failed") + + // 打印前10个失败的原因 + if (status.failed.isNotEmpty()) { + println("🔴 Failed examples:") + status.failed.take(10).forEach { (id, msg) -> + println(" DO $id: $msg") } - // 等待当前块全部完成,避免任务堆积 - futures.forEach { f -> try { f.get() } catch (_: Exception) {} } } } finally { status.running = false @@ -93,7 +96,7 @@ class DoReleaseCoordinatorService( return MessageResponse( id = null, code = "STARTED", name = null, type = null, - message = "Batch release started", errorPosition = null, + message = "Batch release started (serial mode)", errorPosition = null, entity = mapOf("jobId" to jobId, "total" to ids.size) ) } @@ -113,7 +116,7 @@ class DoReleaseCoordinatorService( "finished" to finished, "success" to s.success.get(), "failedCount" to s.failed.size, - "failed" to s.failed.take(50), // 避免超大返回,可分页 + "failed" to s.failed.take(50), "running" to s.running, "progress" to progress, "startedAt" to s.startedAt,