用Java控制内部队列

有很多原因导致你应在程序中使用内部队列。

在程序中使用内部队列有很多充分的理由。大多数常见模式都包含相同的原理-将处理分为两个独立的部分,然后每个部分都可以自主工作。队列是将对象从一个线程转移到另一个线程并确保属于特定转移对象的所有字段的正确可见性的最佳方法。这种常见的模式称为“消费者-生产者”模式。


但是,今天,优锐课想更多地带大家关注潜在的故障,监视以及如何避免程序中丢失消息。


有什么问题吗?!

假设你正在5个实例上的云环境中运行微服务应用程序,这些实例通常会在几天内部署新版本。你的应用程序具有REST端点,该端点将新任务排队到内部队列中进行处理,并立即返回OK。 然后,它在另一个线程上异步处理任务。


有一天,你必须发布新版本的微服务。这很容易;如果你在云上运行,只需按一下按钮,所有内容将作为一个实例部署,而无需停机。


在单击按钮时,你可能犯了一个严重的错误。你丢失了所有排队的任务,并且可能会收到客户的很多抱怨。

实际上,有两种方法可以解决此问题:

  • 你删除了内部队列并实现了外部队列(例如RabbitMQ),直到处理结束时才确认任务已处理。如果你在部署过程中将其切断,那么可以在启动并运行新版本的应用程序时重新处理任务。

  • 你可以断开所有调用者与应用程序的连接,以停止填充内部队列,并等待所有任务处理完毕,然后触发部署。


但是,如何查看所有任务已被处理?我的内部队列中有多少个任务?几十,几百或几千?你可能不知道;很难猜测你的发布者和队列使用者之间的处理时间比例。

通常,有界队列通常趋向于已满或绝对为空,这取决于发布者和使用者之间的处理时间比例是稳定的还是时间上不稳定的。如果你的队列在某个高峰中被任务相对占据(例如,在晚上8-11点之间)并且你有足够的时间在晚上处理它们,那绝对是可以的——当然,如果你愿意牺牲单个任务的延迟。

更糟糕的是,你有无限的队列来保留未处理的任务,然后,如果发布者的速度甚至比使用者的速度稍快,那么在运行应用程序时,最终可能会遇到很大的队列。

当你运行自己的代码并且可以决定要使用哪种队列时,就是这种情况。当内部队列由应用程序中使用的任何框架处理时,你甚至可能遇到这种情况。但是,让我们关注一下一切都掌握在手中的情况,并且你有机会在最后使用的内部队列中进行一些更改。


如何建立正确的见解

让我们同意,我们需要有关内部队列的更多信息,并且我们不能仅假设将新版本的应用程序推入生产环境时我们的队列应该为空。不幸的是,没有任何方法可以公开属于JDK的队列的信息。让我们深入研究一下,尝试自己暴露一些东西。

从基础开始

第一步,我们将公开一些基本信息,这些信息在JDK的Queue接口中可用。

public interface QueueMonitor {
    ThreadPoolExecutor executor();
    /**
     * Returns {@code true} if there is any thread executing any task.
     *
     * @return {@code true} if there is any active task.
     */
    default boolean isRunning() {
        return executor().getActiveCount() > 0;
    }
    /**
     * Returns the approximate number of threads that are actively
     * executing tasks.
     *
     * @return the number of threads
     */
    default int activeCount() {
        return executor().getActiveCount();
    }
    /**
     * Returns the approximate total number of tasks that have
     * completed execution. Because the states of tasks and threads
     * may change dynamically during computation, the returned value
     * is only an approximation, but one that does not ever decrease
     * across successive calls.
     *
     * @return the number of tasks
     */
    default long completedTasksTotal() {
        return executor().getCompletedTaskCount();
    }
    /**
     * Returns the approximate total number of tasks that have ever been
     * scheduled for execution. Because the states of tasks and
     * threads may change dynamically during computation, the returned
     * value is only an approximation.
     *
     * @return the number of tasks
     */
    default long enqueuedTasksTotal() {
        return executor().getTaskCount();
    }
    /**
     * Returns the approximate number of tasks that are current enqueued
     * and waiting to be scheduled for execution.
     *
     * @return number of enqueued tasks.
     */
    default long enqueuedTasksCurrent() {
        return executor().getQueue().size();
    }
    /**
     * Returns the {@link Stream stream} of currently enqueued tasks
     * in an internal queue.
     *
     * @return number of enqueued tasks.
     */
    default Stream<Runnable> enqueuedTasks() {
        return executor().getQueue().stream();
    }
}

 

如果你将此组件的接口保留所有ThreadPoolExecutor并使用executor方法提供,则你会自动获取一些有关队列的基本信息,这些信息可以使用自定义REST监视器API或JMX进一步公开。这完全取决于你的服务是内部服务(不暴露给外部世界)还是你已经具有对应用程序的HTTP访问权限。如果没有,那么根据应用程序的情况和性质,JMX可能是一种更好的方法。


想知道更多

让我们更深入地查找更多信息。当前,我们能够列出所有排队的任务(未处理),并看到一些数字描述如何以及多少任务通过队列。但是,我们缺少有关当前执行的任务的信息。我们可以在其上调用某种方法以获取一些有用信息的确切对象。

/**
 * A custom trackable thread pool which can keep and provide a currently running
 * task and is able to execute {@link TrackableRunnable} which keeps useful
 * information about the current execution.
 * <p>
 * This implementation follows configuration representing
 * {@link Executors#newSingleThreadExecutor()}, the tracking will stop working
 * with multiple workers, some additional changes needed to be done
 * to support multiple workers.
 */
public class TrackableSingleThreadPoolExecutor extends ThreadPoolExecutor {
    /*
     * Task must be held as a volatile variable even in SingleThreadedExecutor.
     * - A thread is destroyed and new one is recreated when an exception is thrown and caught.
     */
    private volatile TrackableRunnable activeTask;
    private TrackableSingleThreadPoolExecutor(ThreadFactory threadFactory) {
        super(1, 1, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(), threadFactory);
    }
    @Override
    protected void beforeExecute(Thread thread, Runnable runnable) {
        if (!(runnable instanceof TrackableRunnable)) {
            throw new IllegalArgumentException("Executed task must be an instance of "
                    + TrackableRunnable.class.getSimpleName());
        }
        this.activeTask = (TrackableRunnable) runnable;
    }
    @Override
    protected void afterExecute(Runnable runnable, Throwable thread) {
        this.activeTask = null;
    }
    public TrackableRunnable getActiveTask() {
        return activeTask;
    }
    /**
     * Keeps a context with an executed runnable. We can track information
     * about currently executed task.
     */
    public static class TrackableRunnable implements Runnable {
        private final Contextual context;
        public TrackableRunnable(Contextual context) {
            this.context = context;
        }
        @Override
        public void run() {
            // Some interesting computation.
        }
        public Contextual getContext() {
            return context;
        }
    }
}

 

正如在JavaDoc中提到的那样,此实现仅支持一个工作程序。 我认为更改实现以返回保留一些上下文信息的活动任务列表并不是一件难事。


如何显示信息?

你可以使用两种简单的方法来发布它:

JMX (Java管理扩展)

  • 你只需要实现MBean并公开你要观察的内容

  • 启动MBean服务器,使其能够通过以下方式与其连接: JVisualVM或其他工具

REST Monitor API

  • 仅在运行内部应用程序时使用,否则以某种方式保护端点可能很有用:

[
  {
    "executor": "food-preparation",
    "active": "spaghetti",
    "enqueued-tasks-current": 0,
    "enqueued-tasks-total": 6,
    "completed-tasks-total": 6,
    "enqueued-tasks": [
      "pizza",
      "caesar salad",
      "cheerios"
    ]
  },
  {
    "executor": "drink-preparation",
    "active": "cuba libre",
    "enqueued-tasks-current": 0,
    "enqueued-tasks-total": 6,
    "completed-tasks-total": 6,
    "enqueued-tasks": [
      "mojito",
      "beer"
    ]
  }
]

 


如何正常关闭执行器?

这是可以帮助你在云环境中重新启动应用程序之前耗尽队列的另一种方法。通常,Kubernetes能够等待终止JVM并执行关闭钩子

你只需配置ThreadPoolExecutor#shutdown()即可在关闭挂钩中调用

Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown));

 

但是,你会遇到几个问题:

  • 终止可以延迟更长的时间,尤其是当你的无限制队列中充满任务时。

  • 你需要确保你不再接受任何任务,因为所有任务都将被执行者拒绝,并且当使用适当的RejectedExecutionHandler实现时,应指定执行者的行为。

  • 最好再保护一次任务(尤其是重要任务)。 我的意思是实现一种机制,即不确认拒绝的消息,然后将其返回到外部队列,例如,该消息可以等待新的正常实例并随后进行处理的外部队列。当通过REST API调用我们的应用程序并自动拒绝调用并丢失事务/任务时,可能会出现问题。



感谢阅读!更深入探讨欢迎留言或私信!

抽丝剥茧 细说架构那些事——【优锐课】

上一篇:利用django发送邮件


下一篇:异步IO框架:asyncio 中篇