高并发系统限流操作之令牌桶实现可变TPS控制
引言年前有个需求,批量请求供应商API,要有限流操作,并支持TPS与并发数可配置,那时候简单的查了查资料,任务结束就过去了,最近又有个限流的小需求,所以又翻出了以前的代码。
本次简单记录一下令牌桶的实现:
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
实现思路:用LinkedBlockingQueue作为装令牌的桶,Executors.newSingleThreadScheduledExecutor()作为定时器定时将令牌放入桶中,使用构建者模式的代码风格。忘了以前在哪抄的了,就这样吧。
贴上核心代码:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010 ...
JAVA并发工具设计套路
引言在学习JAVA并发工具时,分析JUC下的源码,发现有三个利器:状态、队列、CAS。
状态一般是state属性,如AQS源码中的状态,是整个工具的核心,一般操作的执行都要看当前状态是什么,由于状态是多线程共享的,所以都是volatile修饰,保证线程直接内存可见。
12345678910111213141516171819202122232425262728293031323334353637383940/** * AbstractQueuedSynchronizer中的状态 */ private volatile int state; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its succ ...
【J.U.C】线程池之实现原理
线程池状态线程池的内部状态由AtomicInteger修饰的ctl表示,其高3位表示线程池的运行状态,低29位表示线程池中的线程数量:具体看代码注释。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771 ...
【J.U.C】线程池之工作流程与ThreadPoolExecutor介绍
线程池工作流程
如果当前运行的线程少于corePoolSize,则创建新线程(核心线程)来执行任务。
如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue。
如果BlockingQueue队列已满,则创建新的线程(非核心)来处理任务。
如果核心线程与非核心线程总数超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreeadPoolExecutor1234567public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> work ...
【J.U.C】线程池之几个常用接口介绍
1、线程池常用接口介绍1.1、Executor123public interface Executor {void execute(Runnable command);}
执行提交的Runnable任务。其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。
1.2、ExecutorServiceExecutorService继承自Executor,下面挑几个方法介绍:
1.2.1、shutdown()1void shutdown();
启动有序关闭线程池,在此过程中执行先前提交的任务,但不接受任何新任务。如果线程池已经关闭,调用此方法不会产生额外的效果。此方法不等待以前提交的任务完成执行,可以使用awaitTermination去实现。
1.2.2、shutdownNow()1List<Runnable> shutdownNow();
尝试停止所有正在积极执行的任务, 停止处理等待的任务,并返回等待执行的任务列表。 此方法不等待以前提交的任务完成执行,可以使用await ...
【J.U.C】JAVA并发辅助工具类-CountDownLatch、CyclicBarrier、Semaphore之简单介绍及和Golang的WaitGroup比较
CyclicBarrier同步屏障CyclicBarrier默认的构造方法CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达屏障,然后当前线程被阻塞,直到被拦截的线程全部都到达了屏障,然后前面被阻塞的线程才能开始执行,否则会被一直阻塞。就像正式场合围着桌子吃饭,要等所有人都到到齐之后,才可以开餐。先来的人要等待。
查看栅栏类可以看到其主要是由ReentrantLock和Condition结合计数器实现。
Semaphore信号量用于控制线程的并发数量,线程只有拿到许可证的时候才能执行。其源码中通过Sync同步器实现了AQS的共享模式。因为刚刚分析完ReentrantReadWriteLock中的读锁,实现方式差不多,所以在这就不分析源码了。比喻:吃饭的时候,有一碗汤,只有两个勺子,也就是说同时只能由两个人拿到勺子成汤,其他人没拿到勺子,只能吃别的东西。
CountDownLatch为什么最后说这个类呢,因为它的功能和Golang中的WaitGroup很相似:都属于倒计时类型。在一些应用场 ...
【J.U.C】lockInterruptibly与lock
引言在看LinkedBlockingQueue源码时,发现一点疑惑:poll(long timeout, TimeUnit unit)方法中通过takeLock.lockInterruptibly()获取锁,其重载方法poll()中,通过takeLock.lock()获取锁。
通过查看ReentrantLock源码中lockInterruptibly()与lock()方法,解决上述问题。
lock()lock()调用了抽象类Sync(实现了AQS)的lock()方法,这是一个抽象方法,有两个实现,一个公平锁,一个非公平锁,最终都调用到acquire(int arg)方法,内部处理了中断。优先考虑获取锁,待获取锁成功后,才响应中断。
123456789101112131415161718192021222324252627282930313233343536/** * 获得锁。 * * 如果锁不被其他线程持有,则获取锁,并立即返回,将锁持有计数设置为1。 * * 如果当前线程已经持有锁,那么持有计数将增加1,方法立即返回。 * * 如果锁由另一个 ...
【J.U.C】LockSupport源码翻译
JDK LockSuppoprt源码翻译
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519 ...
【J.U.C】LockSupport
引言LockSupport是用于创建锁和其他同步类的阻塞原语,通过调用Unsafe函数中的接口实现阻塞和解除阻塞的。
pack
方法
解释
park(Object)
挂起当前线程
parkNanos(Object, long)
指定了一个挂起时间(相对于当前的时间),时间到后自动被唤醒;例如1000纳秒后自动唤醒
parkUntil(Object, long)
指定一个挂起时间(绝对时间),时间到后自动被唤醒;例如2018-02-12 21点整自动被唤醒。
park()
和park(Object)相比少了挂起前为线程设置blocker、被唤醒后清理blocker的操作。
parkNanos(long)
和parkNanos(Object, long)类似,仅少了blocker相关的操作
parkUntil(long)
和parkUntil(Object, long)类似,仅少了blocker相关的操作
park(Object blocker)源码分析:
12345678910111213141516171819202122232425262728293 ...
【J.U.C】LongAdder与AtomicLong
引言前一阵子看ConcurrentHashMap的源码,其中计数部分提及到LongAdder,之后看了一下LongAdder,发现他们实现的思路是一样的,都是将单节点的并发分散到多个节点。LongAdder的使用场景也就适用于高并发的情形。比如QPS的计算,在多个线程频发更新QPS的时候,效率会很高。计数选型:单线程那直接用Long最好了,并发不高的情况用AtomicLong,并发高时选LongAdder。
AtomicLong,并发都作用到了单一的volatile的value上。
1private volatile long value;
而LongAdder将并发分散到了多个Cell上,增加实现比较复杂,但是在高并发场景确实比较优秀。
1234567891011121314151617181920212223242526272829303132333435/** * Padded variant of AtomicLong supporting only raw accesses plus CAS. * * JVM intrinsics note: It would be p ...