Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
925 views
in Technique[技术] by (71.8m points)

session - spring integration | See threads stuck in RUNNABLE for a long time when connecting via sftp inbound adapters

We are using spring integration dynamic sftp flows to ingest sftp files . The flow java config looks like below

   from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
            .compareTo(b.lastModified()))//
            .preserveTimestamp(true)//
            .remoteDirectory(job.getRemoteDirectory())//
            .deleteRemoteFiles(job.getDeleteRemoteFiles())//
            .filter(this.compositeRemoteFilter(job))//
            .autoCreateLocalDirectory(true)//
            .preserveTimestamp(true)//
            .maxFetchSize(maxMessagesPerPoll)
            .localFilter(new LocalFileFilter(job))//
            .localDirectory(localDirectory)),
            e -> e.id("testComponent")
                  .autoStartup(false)//
                  .poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
                        .maxMessagesPerPoll(maxMessagesPerPoll)
                        .receiveTimeout(1000L)    
                        .handle(UploadHandler)

The caching session factory is something we get dynamically via using a delegate . Most of it works fine but sometimes after running for days we observe some threads stuck in RUNNABLE . Our assumption was if the jsch session was stuck in any way it should eventually come out as we have timeouts both at the session factory level and at the poller .

The dump for the thread looks something like this


java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

Please help if we are missing anything here or if there is some configuration we can do on the si side to fix this . SI version 5.1.13

Another heap dump trace of thread

"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader  org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable>  com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals  java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable>  java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable>  org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable>  org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable>  org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group  java.lang.ThreadGroup","656","48","2"
"<local variable>  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable>  org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals  java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext  java.security.AccessControlContext","88","40","2"
"name  java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable>  org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable>  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable>  org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable>  org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable>  java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable>  org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool  org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch  com.jcraft.jsch.JSch","736","32","4"
"proxy  custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig  java.util.Properties size = 2","176","48","4"
"sharedSessionLock  java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"<class>  custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password  java.lang.String ""@#@#@#@#""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"enableDaemonThread  java.lang.Boolean = false","16","16","4"
"serverAliveCountMax  java.lang.Integer = 4  0x00000004","16","16","4"
"serverAliveInterval  java.lang.Integer = 240,000  0x0003A980","16","16","4"
"timeout  java.lang.Integer = 120,000  0x0001D4C0","16","16","4"
"userInfoWrapper  org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper&qu

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Your proxy is blocking on STDIN.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...