1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.oodt.cas.filemgr.datatransfer;
19
20
21 import org.apache.oodt.cas.filemgr.structs.Product;
22 import org.apache.oodt.cas.filemgr.structs.Reference;
23 import org.apache.oodt.cas.filemgr.structs.exceptions.ConnectionException;
24 import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException;
25 import org.apache.oodt.cas.filemgr.system.XmlRpcFileManagerClient;
26
27
28 import java.io.File;
29 import java.io.FileInputStream;
30 import java.io.FileOutputStream;
31 import java.io.IOException;
32 import java.net.URI;
33 import java.net.URISyntaxException;
34 import java.net.URL;
35 import java.util.Iterator;
36 import java.util.logging.Level;
37 import java.util.logging.Logger;
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class RemoteDataTransferer implements DataTransfer {
52
53
54
55
56
57 private URL fileManagerUrl = null;
58
59
60
61
62 private int chunkSize = 1024;
63
64
65 private XmlRpcFileManagerClient client = null;
66
67
68 private static final Logger LOG = Logger
69 .getLogger(RemoteDataTransferer.class.getName());
70
71
72
73
74 public RemoteDataTransferer(int chunkSz) {
75 this.chunkSize = chunkSz;
76 }
77
78
79
80
81
82
83
84
85 public void setFileManagerUrl(URL url) {
86 try {
87 client = new XmlRpcFileManagerClient(url);
88 this.fileManagerUrl = url;
89 LOG.log(Level.INFO, "Remote Data Transfer to: ["
90 + client.getFileManagerUrl().toString() + "] enabled");
91 } catch (ConnectionException e) {
92 LOG.log(Level.WARNING, "Connection exception for filemgr: [" + url
93 + "]");
94 }
95 }
96
97
98
99
100
101
102
103
104 public void transferProduct(Product product) throws DataTransferException,
105 IOException {
106
107 if (fileManagerUrl == null) {
108 throw new DataTransferException(
109 "No file manager url specified for remote data transfer: cannot transfer product: ["
110 + product.getProductName() + "]!");
111 }
112
113 quietNotifyTransferProduct(product);
114
115
116 for (Iterator<Reference> i = product.getProductReferences().iterator(); i
117 .hasNext();) {
118 Reference r = i.next();
119
120 File refFile = null;
121 try {
122 refFile = new File(new URI(r.getOrigReference()));
123 } catch (URISyntaxException e) {
124 LOG.log(Level.WARNING,
125 "Unable to test if reference: [" + r.getOrigReference()
126 + "] is a directory: skipping it");
127 continue;
128 }
129
130 if (!refFile.isDirectory()) {
131 LOG.log(Level.FINE, "Reference: [" + r.getOrigReference()
132 + "] is file: transferring it");
133
134 try {
135 remoteTransfer(r, product);
136 } catch (URISyntaxException e) {
137 LOG.log(Level.WARNING,
138 "Error transferring file: [" + r.getOrigReference()
139 + "]: URISyntaxException: " + e.getMessage());
140 }
141 } else {
142 LOG.log(
143 Level.FINE,
144 "RemoteTransfer: skipping reference: ["
145 + refFile.getAbsolutePath() + "] of product: ["
146 + product.getProductName() + "]: ref is a directory");
147 }
148 }
149
150 quietNotifyProductTransferComplete(product);
151
152 }
153
154
155
156
157
158
159
160
161 public void retrieveProduct(Product product, File directory)
162 throws DataTransferException, IOException {
163 for (Reference reference : product.getProductReferences()) {
164 FileOutputStream fOut = null;
165 try {
166 File dataStoreFile = new File(new URI(
167 reference.getDataStoreReference()));
168 File dest = new File(directory, dataStoreFile.getName());
169 fOut = new FileOutputStream(dest, false);
170 LOG.log(
171 Level.INFO,
172 "RemoteDataTransfer: Copying File: " + "fmp:"
173 + dataStoreFile.getAbsolutePath() + " to " + "file:"
174 + dest.getAbsolutePath());
175 byte[] fileData = null;
176 int offset = 0;
177 while (true) {
178 fileData = (byte[]) client.retrieveFile(
179 dataStoreFile.getAbsolutePath(), offset, 1024);
180 if (fileData.length <= 0)
181 break;
182 fOut.write(fileData);
183 if (fileData.length < 1024)
184 break;
185 offset += 1024;
186 }
187 } catch (Exception e) {
188 throw new DataTransferException("", e);
189 } finally {
190 try {
191 fOut.close();
192 } catch (Exception e) {
193 }
194 }
195 }
196 }
197
198 private void remoteTransfer(Reference reference, Product product)
199 throws URISyntaxException {
200
201 File origFile = new File(new URI(reference.getOrigReference()));
202 File destFile = new File(new URI(reference.getDataStoreReference()));
203 String origFilePath = origFile.getAbsolutePath();
204 String destFilePath = destFile.getAbsolutePath();
205
206
207
208 byte[] buf = new byte[chunkSize];
209
210 FileInputStream is = null;
211
212 try {
213 is = new FileInputStream(origFile);
214 int offset = 0;
215 int numBytes = 0;
216
217
218
219 if (!client.removeFile(destFilePath)) {
220 LOG.log(Level.WARNING,
221 "RemoteDataTransfer: attempt to perform overwrite of dest file: ["
222 + destFilePath + "] failed");
223 }
224
225 while ((numBytes = is.read(buf, offset, chunkSize)) != -1) {
226 client.transferFile(destFilePath, buf, offset, numBytes);
227 }
228 } catch (IOException e) {
229 LOG.log(Level.WARNING,
230 "Error opening input stream to read file to transfer: Message: "
231 + e.getMessage());
232 return;
233 } catch (DataTransferException e) {
234 LOG.log(
235 Level.WARNING,
236 "DataTransferException when transfering file: [" + origFilePath
237 + "] to [" + destFilePath + "]: Message: "
238 + e.getMessage());
239 return;
240 } finally {
241 if (is != null) {
242 try {
243 is.close();
244 } catch (Exception ignore) {
245 }
246
247 is = null;
248 }
249 }
250 }
251
252 private void quietNotifyTransferProduct(Product p) {
253 try {
254 client.transferringProduct(p);
255 } catch (DataTransferException e) {
256 e.printStackTrace();
257 LOG.log(Level.WARNING,
258 "Error notifying file manager of product transfer initiation for product: ["
259 + p.getProductId() + "]: Message: " + e.getMessage());
260 return;
261 }
262 }
263
264 private void quietNotifyProductTransferComplete(Product p) {
265 try {
266 client.removeProductTransferStatus(p);
267 } catch (DataTransferException e) {
268 e.printStackTrace();
269 LOG.log(Level.WARNING,
270 "Error notifying file manager of product transfer completion for product: ["
271 + p.getProductId() + "]: Message: " + e.getMessage());
272 return;
273 }
274 }
275
276 }