Java APNS使用填坑
·
1.java向ios推送消息有现成的类库java APNS可以使用,github地址:https://github.com/notnoop/java-apns
2.官网上给出了使用的demo:
//Setup the connection
ApnsService service =
APNS.newService()
.withCert("/path/to/certificate.p12", "MyCertPassword")
.withSandboxDestination()
.build();
//Create and send the message
String payload = APNS.newPayload().alertBody("Can't be simpler than this!").build();
String token = "fedfbcfb....";
service.push(token, payload);
首先是构造一个ApnsService,需要传递证书,然后就是构造消息内容,然后就是发送,使用起来相当的简单。
3.当部署到线上服务器以后,发现推送消息非常慢,有时候7,8分钟才能返回,我在做推送的时候怕阻塞主线程,用了一个线程池异步来发消息的,很有可能是线程池沾满了,后续的消息就发不出去了。
jstack打印堆栈,发现了大量名为“MonitoringThread-1”的线程,而且状态还都是RUNNABLE的!
<pre name="code" class="html">"MonitoringThread-1" daemon prio=10 tid=0x00007f0d381bf800 nid=0x481a runnable [0x00007f0d147f2000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.read(InputRecord.java:480)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
- locked <0x00000000f6219d18> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
- locked <0x00000000f6222d08> (a sun.security.ssl.AppInputStream)
at com.notnoop.apns.internal.ApnsConnectionImpl$2.readPacket(ApnsConnectionImpl.java:241)
at com.notnoop.apns.internal.ApnsConnectionImpl$2.run(ApnsConnectionImpl.java:153)
at java.lang.Thread.run(Thread.java:745)
可以看出来是阻塞在了socket的read()上,先查下 MonitoringThread-1是在哪里创建的?
<pre name="code" class="java">private synchronized void sendMessage(ApnsNotification m, boolean fromBuffer) throws NetworkIOException {
logger.debug("sendMessage {} fromBuffer: {}", m, fromBuffer);
...........
while (true) {
try {
attempts++;
Socket socket = getOrCreateSocket(fromBuffer);
socket.getOutputStream().write(m.marshall());
socket.getOutputStream().flush();
cacheNotification(m);
delegate.messageSent(m, fromBuffer);
//logger.debug("Message \"{}\" sent", m);
attempts = 0;
break;
} catch (IOException e) {
.....
</pre><pre name="code" class="java"><pre name="code" class="java">private synchronized Socket getOrCreateSocket(boolean resend) throws NetworkIOException {
..........
if (socket == null || socket.isClosed()) {
try {
........
socket.setSoTimeout(readTimeout);
socket.setKeepAlive(true);
if (errorDetection) {
monitorSocket(socket);
}
............
}
return socket;
}
private void monitorSocket(final Socket socket) {
logger.debug("Launching Monitoring Thread for socket {}", socket);
Thread t = threadFactory.newThread(new Runnable() {
final static int EXPECTED_SIZE = 6;
@SuppressWarnings("InfiniteLoopStatement")
@Override
public void run() {
logger.debug("Started monitoring thread");
try {
InputStream in;
try {
in = socket.getInputStream();
} catch (IOException ioe) {
in = null;
}
byte[] bytes = new byte[EXPECTED_SIZE];
while (in != null && readPacket(in, bytes)) {
................
}
}
}
public ApnsConnectionImpl(SocketFactory factory, String host, int port, Proxy proxy, String proxyUsername, String proxyPassword,
ReconnectPolicy reconnectPolicy, ApnsDelegate delegate, boolean errorDetection, ThreadFactory tf, int cacheLength,
boolean autoAdjustCacheLength, int readTimeout, int connectTimeout) {
this.factory = factory;
this.host = host;
this.port = port;
this.reconnectPolicy = reconnectPolicy;
this.delegate = delegate == null ? ApnsDelegate.EMPTY : delegate;
this.proxy = proxy;
this.errorDetection = errorDetection;
this.threadFactory = tf == null ? defaultThreadFactory() : tf;
this.cacheLength = cacheLength;
this.autoAdjustCacheLength = autoAdjustCacheLength;
this.readTimeout = readTimeout;
this.connectTimeout = connectTimeout;
this.proxyUsername = proxyUsername;
this.proxyPassword = proxyPassword;
cachedNotifications = new ConcurrentLinkedQueue<ApnsNotification>();
notificationsBuffer = new ConcurrentLinkedQueue<ApnsNotification>();
}
private ThreadFactory defaultThreadFactory() {
return new ThreadFactory() {
ThreadFactory wrapped = Executors.defaultThreadFactory();
@Override
public Thread newThread( Runnable r )
{
Thread result = wrapped.newThread(r);
result.setName("MonitoringThread-"+threadId.incrementAndGet());
result.setDaemon(true);
return result;
}
};
}
这里我们看到了“MonitoringThread-1”的来源,可以看出来,只要设置了errorDetection参数,如果socket是null或者没有连上,就会新起一个线程来读响应,我们在调用的时候每次使用完毕就把service给close掉了,导致每次发消息socket都会重建,就会多出一个线程来,并且线程的id始终都是1.只要把errorDetection设置为false就可以了。</p><p>4.在ApnsService的build的时候,我们发现了另一个有用的参数:isQueued:
<pre name="code" class="java">public ApnsService build() {
...............
service = new ApnsServiceImpl(conn, feedback);
if (isQueued) {
service = new QueuedApnsService(service, queueThreadFactory);
}
if (isBatched) {
service = new BatchApnsService(conn, feedback, batchWaitTimeInSec, batchMaxWaitTimeInSec, batchThreadFactory);
}
service.start();
return service;
}
我们默认创建的ApnsServiceImpl,通过这个参数可以设置为QueuedApnsService,这个就比较有用了,发消息的时候,网消息队列里扔进入就ok了,正是我们所需要的:
public class QueuedApnsService extends AbstractApnsService {
private static final Logger logger = LoggerFactory.getLogger(QueuedApnsService.class);
private ApnsService service;
private BlockingQueue<ApnsNotification> queue;
private AtomicBoolean started = new AtomicBoolean(false);
public QueuedApnsService(ApnsService service) {
this(service, null);
}
public QueuedApnsService(ApnsService service, final ThreadFactory tf) {
super(null);
this.service = service;
this.queue = new LinkedBlockingQueue<ApnsNotification>();
this.threadFactory = tf == null ? Executors.defaultThreadFactory() : tf;
this.thread = null;
}
@Override
public void push(ApnsNotification msg) {
if (!started.get()) {
throw new IllegalStateException("service hasn't be started or was closed");
}
queue.add(msg);
}
private final ThreadFactory threadFactory;
private Thread thread;
private volatile boolean shouldContinue;
public void start() {
if (started.getAndSet(true)) {
// I prefer if we throw a runtime IllegalStateException here,
// but I want to maintain semantic backward compatibility.
// So it is returning immediately here
return;
}
service.start();
shouldContinue = true;
thread = threadFactory.newThread(new Runnable() {
public void run() {
while (shouldContinue) {
try {
ApnsNotification msg = queue.take();
service.push(msg);
} catch (InterruptedException e) {
// ignore
} catch (NetworkIOException e) {
// ignore: failed connect...
} catch (Exception e) {
// weird if we reached here - something wrong is happening, but we shouldn't stop the service anyway!
logger.warn("Unexpected message caught... Shouldn't be here", e);
}
}
}
});
thread.start();
}
public void stop() {
started.set(false);
shouldContinue = false;
thread.interrupt();
service.stop();
}
@Override
public Map<String, Date> getInactiveDevices() throws NetworkIOException {
return service.getInactiveDevices();
}
public void testConnection() throws NetworkIOException {
service.testConnection();
}
}
因此,最终我们构造ApnsService的时候是类似于这样的:
service = APNS.newService().withCert(cert, password).withSandboxDestination().asQueued().withNoErrorDetection().build();
更多推荐
已为社区贡献3条内容
所有评论(0)