我是靠谱客的博主 妩媚唇膏,这篇文章主要介绍Apache Camel之FTP组件学习写在最前面一、Apache Camel是什么东西?二、官网FTP API 翻译整理三、具体功能实现,现在分享给大家,希望可以做个参考。

写在最前面

哎,最近提了离职,手头的活也基本上清理的差不多了。想着这个把月可以舒服的晃悠晃悠的离开,但是运维的小伙伴总是不架势,走之前还是提了个新需求。

先说下需求吧,我们的系统概括的讲就是一个接口系统,对外的方式无外乎三种,MQ、WEBSERVICE以及FTP了。因为FTP的业务是前人留下来东西,而它恰好一直不出问题,逻辑也比较复杂,所以一直都懒得看里面的内容,只是初步的知道是用的Apache Camel的ftp的路由。

这次运维同学提出的需求正好就是关于ftp的,想着离职还要一段时间,索性就研究这东西吧。因为我也是初步接触,有不对的地方还请指正。具体的需求就是,我们的系统需要监控远程ftp上的ZIP包文件,将ZIP包下载到本地解压,然后解析导入数据库中。ps:类似JMS监控MQ。

一、Apache Camel是什么东西?

Apache Camel 的官网是http://camel.apache.org 。它是一个通用的基于已知的开源集成框架企业集成模式。简单的讲,Apache Camel就是集成现有的一系列中间件架构,如HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF等,实现传输协议和消息格式的转换等等。看起来功能还挺强大,不过大部分可能在垂直领域有更好的解决方案。本文只说一下FTP的这个路由组件。(这段话可能说的不是太清楚,我自己也是迷迷糊糊,有能力的同学自己看下官网的解释吧)

二、官网FTP API 翻译整理

来源:http://camel.apache.org/ftp

2.1 关于jar包

复制代码
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.13.1</version> </dependency>

这个是我们项目中用的Camel版本,比较老了,最新的是2.19.2

复制代码
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.19.2</version> </dependency>

 2.2 URI的格式

ftp://[username@]hostname[:port]/directoryname[?options]

sftp://[username@]hostname[:port]/directoryname[?options]

ftps://[username@]hostname[:port]/directoryname[?options]

下面这个链接是我本地测试使用的,可以参考下,具体参数的意义请往下看。

复制代码
1
ftp://173.5.206.53:2121/MHE/?username=ftpAdmin&amp;password=123456&amp;binary=true&amp;passiveMode=true&amp;delete=true&amp;delay=60000

其中directoryname表示文件夹目录。目录名称是一个相对路径。不支持绝对路径。相对路径可以包含嵌套文件夹,例如 /inbox/us。

对于Camel2.16之前的版本,directoryName必须是已经存在的,这个配置不支持autoCreate选项(自动创建哪个文件夹)。原因是FTP服务器上有权限限制。

对于Camel2.16,是支持autoCreate选项的。当消费者启动,在执行轮询计划之前,会自动到FTP服务器创建对应的文件目录。默认值autoCreate是true.

如果没有提供用户名,会使用anonymous匿名登录,密码会随机尝试。
如果没有提供端口号,Camel将根据协议提供缺省值(ftp sftp = ftp = 21日22日= 2222)。

你可以添加以下格式的URI查询配置, ?option=value&option=value&...  如上文我的测试链接。&amp;是我在pom文件中转译使用的。

这个配置使用两个不同的库操作FTP。FTP和FTPS使用Apache Commons Net,而SFTP使用JCraft JSCH。

FTPS组件仅可在Camel2.2及更新的版本。
FTPS(也称为安全FTP)是一个扩展添加支持FTP传输层安全性(TLS)和安全套接字层(SSL)加密协议。

ps:因为我目前使用的是2.13.1,关于新版本的特性我也不是太了解。

2.3 URI的各种配置选项

Name

Default Value

Description

username

null

指定要使用的用户名登录远程服务器。

password

null

指定的密码用来登录到远程服务器。

account

nullCamel 2.15.2: 指定的帐户用于登录到远程FTP服务器(只对FTP和FTPS)

binary

false

指定文件传输模式,二进制或ASCII。默认是ASCII(false)。

disconnect

false

Camel 2.2: 使用后是否要断开远程FTP服务器。可用于消费者和生产者。只会断开断开当前连接到FTP服务器。如果你有一个你想要停止消费,那么你需要停止消费者路由。

localWorkDirectory

null

在使用时,可以使用本地工作目录将远程文件内容直接存储在本地文件中,以避免将内容加载到内存中。这个是有好处的,如果你下载一个非常大的远程文件,可以节省内存。详情见下文。

passiveMode

false

FTP and FTPS only: 指定是否使用被动模式连接。默认是false。

download

true

Camel 2.11: FTP消费者是否应该下载该文件。如果将此选项设置为false,那么消息体将null,但消费者仍将触发一个Camel Exchange获得文件的详细信息,如文件名,文件大小,等等。只是不能下载的文件。

streamDownload

false

Camel 2.11: 消费者是否应该下载整个文件前,默认的行为,或者是否应该通过InputStream从路由远程资源读取而不是从内存中的Camel Exchange数组获取。如果下载失败的或是本地目录提供,这个选项可以忽略。此选项对于处理大型远程文件非常有用。

execProt

null

Camel 2.4: FTPS only: 默认情况下,如果没有禁用安全数据通道默认值,则使用选项p。可能的值是:
C: Clear 
S: Safe (SSL protocol only) 
E: Confidential (SSL protocol only) 
P: Private

execPbsz

null

Camel 2.4: FTPS only: 此选项指定安全数据通道的缓冲区大小。如果选择useSecureDataChannel,但是没有被显式设置,就没有使用价值了。

fastExistsCheck

false

Camel 2.8.2, 2.9: 如果将此选项设置为true,camel-ftp将直接使用列表文件检查文件是否存在。由于一些FTP服务器可能不支持直接列出文件,如果选项是错误的,camel-ftp将使用旧的方法列出目录并检查文件是否存在。注意从Camel 2.10.1起这个选项也影响readLock=changed控制是否执行一个快速检查更新文件信息。如果FTP服务器有大量文件,则可以使用它来加快进程。

reconnectDelay

1000

Delay in millis Camel will wait before performing a reconnect attempt.

connectTimeout

10000

Camel 2.4: Is the connect timeout in millis. This corresponds to using ftpClient.connectTimeoutfor the FTP/FTPS. For SFTP this option is also used when attempting to connect.

soTimeout

null / 30000

FTP and FTPS Only: Camel 2.4: Is the SocketOptions.SO_TIMEOUT value in millis. A good idea is to configure this to a value such as 300000 (5 minutes) to not hang a connection. On SFTP this option is set as timeout on the JSCH Session instance.

Also SFTP from Camel 2.14.3/2.15.3/2.16 onwards.

From Camel 2.16 onwards the default is 300000 (300 sec).

timeout

30000

FTP and FTPS Only: Camel 2.4: Is the data timeout in millis. This corresponds to usingftpClient.dataTimeout for the FTP/FTPS. For SFTP there is no data timeout.

chmod

null

SFTP Producer Only: Camel 2.9: 允许你设置chmod存储文件。例如 chmod=640.

receiveBufferSize

32768FTP/FTPS Only: Camel 2.15.1: 下载文件的缓冲区大小。默认大小是32 kb。

ftpClient

null

FTP and FTPS Only: Camel 2.1: Allows you to use a customorg.apache.commons.net.ftp.FTPClient instance.

disconnectOnBatchCompletefalseCamel 2.18: 是否在批处理完成后从远程FTP服务器断开连接。可用于消费者和生产者。断开连接只会断开与FTP服务器的当前连接。如果你有一个想要停止的消费者,那么你需要停止消费者路由
activePortRange Camel 2.18: 在主动模式设置客户端端口范围。语法是:minPort-maxPort。端口号都是包括的,如10000 - 19999包括所有xxxx端口。

2.4 Camel默认的一些配置

FTP消费者在默认情况下将消耗远程FTP服务器上的文件。你必须显式地配置读取后如何处理这个文件,如果你想要删除的文件或移动到另一个位置。例如你可以使用delete=true删除的文件,或者使用move=.done将文件移动到一个隐藏的子目录。

常规的文件消费是不同的,因为它将默认移动文件.camel子目录。Camel不做FTP默认的处理行为是它可能缺少权限,默认情况下无法移动或删除文件。

三、具体功能实现

先放一个小DEMO

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/** * Apache Camel FTP Demo * @author 小卖铺的老爷爷 */ public class HelloWorld extends RouteBuilder {   //启动FTP路由,实际项目中初始化应该是单独的一个类 public static void main(String[] args) throws Exception { // 这是camel上下文对象,整个路由的驱动全靠它了。 ModelCamelContext camelContext = new DefaultCamelContext(); // 启动route camelContext.start(); // 将我们的路由处理加入到上下文中 camelContext.addRoutes(new HelloWorld()); } @Override public void configure() throws Exception { //从FTP上下载文件到本地目录,相关参数的意义,参考我上文贴出的API,实际项目中这些地址一般写在配置文件中 from("ftp://173.5.206.53:2121/MHE/?username=ftpAdmin&password=123456&binary=true&passiveMode=true&delete=true&delay=60000") //自定义的处理器,可以做各种逻辑处理,如文件名匹配下载等 .process(new HttpProcessor()) .to("file:d:/wms-fe/inFile"); } }

上文的demo是每隔一分钟扫描ftp服务器目录上是否有新文件,如果有匹配文件名复核条件的下载到本地,并将服务器上的文件删除。当然我们也可以不删除,将文件移到一个固定目录备份,这里可以用move=XXX参数。.process(new HttpProcessor()) 其中这句是可以删除的,删除后就是只有有文件,Camel就会下载到本地了。

可能小Demo中代码不是很齐全,因为Processor的处理逻辑的代码可能有点问题,被我删掉了。

下面看下我实际项目里面的处理逻辑吧,

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/** * * 此类描述的是:FTP下载、加压、解析处理
*
*/ public class FTPMheRoute extends RouteBuilder { private String ftpDownURI; private String downDir; private String unpackDir; private String ftpFileCharset; private void initialize() { ftpDownURI = WmsFEUtil.getSysConfigValue("camel.ftp.download.uri").trim(); downDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.dir").trim(); unpackDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir").trim(); ftpFileCharset = WmsFEUtil.getSysConfigValue("camel.ftp.file.charset"); } // @Override public void configure() throws Exception { initialize(); downRoute(); unpackRoute(); parseRoute(); } /** * 此方法描述的是:下载路由 */ private void downRoute() { from(ftpDownURI).to(downDir).process(new MHEDownloadedProcessor()); // from(ftpDownURI) // .choice() // .when(new FTPFilter(ftpFileCharset, FTPFilter.TYPE_FTP_FILE_NORMAL)) // .process(new FTPProcessor(ftpFileCharset)) // .to(downDir); //.process(new MHEDownloadedProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ " when find the zip file it will download to "+downDir ); } /** * 此方法描述的是:解压路由 */ private void unpackRoute() { from(downDir).process(new MHEDownUnpackProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ "camel listner @ "+ downDir , " when find the zip file it will unpack to "+unpackDir ); } /** * 此方法描述的是:解析路由 */ private void parseRoute() { from(unpackDir).process(new MHEDownExecuteProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ "camel listner @ "+ unpackDir , " when find the txt file it will parse to DB." ); } }

 上面的代码中,我使用三个路由连接,上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置(http://camel.apache.org/direct.html)。注意,两个被连接的路由一定要是可用的,并且存在于同一个Camel服务中。

最后说一下Process,它用于接收从控制端点、路由选择条件又或者另一个处理器的Exchange中传来的消息信息,并进行处理。这里process(Exchange exchange)方法是必须进行实现的。

因为某些原因,这里我只贴下我解压路由的处理逻辑。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/** * * 此类描述的是:FTP解压处理 */ public class MHEDownUnpackProcessor implements Processor { private static final String TXT_PATTERN = ".*?\.txt"; private static final String ZIP_PATTERN = ".*?\.zip"; private static final String BUSINESSTYPE_FTPMHE = "FTPMHE"; @Override public void process(Exchange exchange) throws Exception { Message message = exchange.getIn(); GenericFile<?> gf = (GenericFile<?>)message.getBody(); File zipFile = (File) gf.getFile(); unzip(zipFile); } /** * 此方法描述的是:解压缩文件 * @throws IOException */ private List<File> unzip(File zipFile) throws IOException { String zipName = zipFile.getName(); if (Pattern.matches(ZIP_PATTERN, zipName)){ List<File> list = ZipUtil.unzip(zipFile, getDownUnpackDir(), TXT_PATTERN); if (list == null){ BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解压失败,文件内容为空"); ParseFileTools.move2Dir(zipFile, "fail"); }else{ //FileUtil.deleteFile(zipFile); ParseFileTools.move2Dir(zipFile, "success"); } return list; }else{ BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解压失败,压缩包为非法格式"); ParseFileTools.move2Dir(zipFile, "fail"); } return null; } /** * 此方法描述的是:获得解压临时目录 */ public static String getDownUnpackDir() { return getURI(WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir")); } /** * 此方法描述的是:获得文件路径 */ public static String getURI(String fileURI) { String uri = fileURI.replaceFirst("file://", ""); int lastParam = uri.lastIndexOf("?"); if (lastParam != -1){ uri = uri.substring(0, lastParam); } return uri; } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public class ZipUtil { /** * 此方法描述的是:ZIP压缩 * @param source 源文件 * @param dest 压缩文件 * @version: 2015年3月2日 上午9:17:26 */ public static String zip(File source, File dest) { ZipOutputStream out = null; BufferedOutputStream bo = null; try{ File zipParent = dest.getParentFile(); if (!zipParent.exists()){ zipParent.mkdirs(); } out = new ZipOutputStream(new FileOutputStream(dest)); bo = new BufferedOutputStream(out); zip(out, source, source.getName(), bo); return dest.getAbsolutePath(); }catch(Exception e){ OutUtil.error(e,e.getMessage()); }finally{ if (bo!=null){ try { bo.close(); } catch (IOException e) { } } if (out!=null){ try { out.close(); // 输出流关闭 } catch (IOException e) { } } } return null; } public static void zip(ZipOutputStream out, File f, String base,BufferedOutputStream bo) throws Exception { // 方法重载 out.putNextEntry(new ZipEntry(base)); // 创建zip压缩进入点base FileInputStream in = new FileInputStream(f); BufferedInputStream bi = new BufferedInputStream(in); int b; try{ while ((b = bi.read()) != -1) { bo.write(b); // 将字节流写入当前zip目录 } }finally{ bi.close(); in.close(); // 输入流关闭 bo.close(); } } /** 解压缩(压缩文件中包含多个文件)可代替上面的方法使用。 * ZipInputStream类 * 当我们需要解压缩多个文件的时候,ZipEntry就无法使用了, * 如果想操作更加复杂的压缩文件,我们就必须使用ZipInputStream类 * */ public static List<File> unzip(File sourceFile ,String outPath,String fileNameRegexp){ List<File> listDestFile = new ArrayList<File>(); File outFile = null; ZipFile zipFile = null; FileInputStream sourceInput = null; ZipInputStream zipInput = null; ZipEntry entry = null; InputStream input = null; OutputStream output = null; String sourceName = sourceFile.getName(); try { zipFile = new ZipFile(sourceFile); sourceInput = new FileInputStream(sourceFile); zipInput = new ZipInputStream(sourceInput); while((entry = zipInput.getNextEntry()) != null){ String entryName = entry.getName(); if (!Pattern.matches(fileNameRegexp, entryName)){ continue ; } outFile = new File(outPath + File.separator + sourceName+"-"+entryName); if(!outFile.getParentFile().exists()){ outFile.getParentFile().mkdir(); } if(!outFile.exists()){ outFile.createNewFile(); } input = zipFile.getInputStream(entry); output = new FileOutputStream(outFile); int temp = 0; while((temp = input.read()) != -1){ output.write(temp); } input.close(); output.close(); listDestFile.add(outFile); } return listDestFile; } catch (Exception e) { e.printStackTrace(); } finally{ try { if (input!=null) input.close(); if (output!=null) output.close(); if (zipInput!=null) zipInput.close(); if (sourceInput!=null) sourceInput.close(); if (zipFile!=null) zipFile.close(); } catch (IOException e) {} } return null; } /** * 此方法描述的是:获得ZIP文件同名解压目录 * @version: 2015年3月5日 下午2:10:41 */ public static String getUnpackForder(File zipFile) { String filePath = zipFile.getAbsolutePath(); return filePath.substring(0, filePath.lastIndexOf(".")); } /** * 此方法描述的是:获得ZIP文件指定解压目录 * @version: 2015年3月6日 下午10:28:36 */ public static String getUnpackForder(File zipFile, String subDir) { return zipFile.getParent()+File.separator+subDir; } }

 基本上就是上面这些,贴出的代码只具有参考意义。最后目前还有一个问题,在解压和解析处理的时候,我做了备份,成功的会移到succ文件夹,失败的会移到fail文件夹,但是.camel默认生成的文件夹虽然没什么用还是一直存在,不知道该如何禁止这个文件夹的生成。还请玩过Camel的大牛指导下。谢谢。

参考:

http://camel.apache.org

http://camel.apache.org/ftp.html

http://blog.csdn.net/column/details/sys-communication.html

转载于:https://www.cnblogs.com/laoyeye/p/7491873.html

最后

以上就是妩媚唇膏最近收集整理的关于Apache Camel之FTP组件学习写在最前面一、Apache Camel是什么东西?二、官网FTP API 翻译整理三、具体功能实现的全部内容,更多相关Apache内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(84)

评论列表共有 0 条评论

立即
投稿
返回
顶部