View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.oodt.cas.filemgr.datatransfer;
19  
20  //OODT imports
21  import org.apache.oodt.cas.filemgr.structs.Product;
22  import org.apache.oodt.cas.filemgr.structs.Reference;
23  import org.apache.oodt.cas.filemgr.structs.exceptions.ConnectionException;
24  import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException;
25  import org.apache.oodt.cas.filemgr.system.XmlRpcFileManagerClient;
26  
27  //JDK imports
28  import java.io.File;
29  import java.io.FileInputStream;
30  import java.io.FileOutputStream;
31  import java.io.IOException;
32  import java.net.URI;
33  import java.net.URISyntaxException;
34  import java.net.URL;
35  import java.util.Iterator;
36  import java.util.logging.Level;
37  import java.util.logging.Logger;
38  
39  /**
40   * @author mattmann
41   * @author bfoster
42   * @version $Revision$
43   * 
44   *          <p>
45   *          An implementation of the {@link DataTransfer} interface that
46   *          transfers files to a remote file manager over XML-RPC, using the
47   *          File Manager Client.
48   *          </p>
49   * 
50   */
51  public class RemoteDataTransferer implements DataTransfer {
52  
53     /*
54      * the url pointer to the file manager that we'll remotely transfer the file
55      * to
56      */
57     private URL fileManagerUrl = null;
58  
59     /*
60      * the size of the chunks that files should be transferred over XML-RPC using
61      */
62     private int chunkSize = 1024;
63  
64     /* our file manager client */
65     private XmlRpcFileManagerClient client = null;
66  
67     /* our log stream */
68     private static final Logger LOG = Logger
69           .getLogger(RemoteDataTransferer.class.getName());
70  
71     /**
72       * 
73       */
74     public RemoteDataTransferer(int chunkSz) {
75        this.chunkSize = chunkSz;
76     }
77  
78     /*
79      * (non-Javadoc)
80      * 
81      * @see
82      * org.apache.oodt.cas.filemgr.datatransfer.DataTransfer#setFileManagerUrl
83      * (java.net.URL)
84      */
85     public void setFileManagerUrl(URL url) {
86        try {
87           client = new XmlRpcFileManagerClient(url);
88           this.fileManagerUrl = url;
89           LOG.log(Level.INFO, "Remote Data Transfer to: ["
90                 + client.getFileManagerUrl().toString() + "] enabled");
91        } catch (ConnectionException e) {
92           LOG.log(Level.WARNING, "Connection exception for filemgr: [" + url
93                 + "]");
94        }
95     }
96  
97     /*
98      * (non-Javadoc)
99      * 
100     * @see
101     * org.apache.oodt.cas.filemgr.datatransfer.DataTransfer#transferProduct(
102     * org.apache.oodt.cas.filemgr.structs.Product)
103     */
104    public void transferProduct(Product product) throws DataTransferException,
105          IOException {
106 
107       if (fileManagerUrl == null) {
108          throw new DataTransferException(
109                "No file manager url specified for remote data transfer: cannot transfer product: ["
110                      + product.getProductName() + "]!");
111       }
112 
113       quietNotifyTransferProduct(product);
114 
115       // for each file reference, transfer the file to the remote file manager
116       for (Iterator<Reference> i = product.getProductReferences().iterator(); i
117             .hasNext();) {
118          Reference r = i.next();
119          // test whether or not the reference is a directory or a file
120          File refFile = null;
121          try {
122             refFile = new File(new URI(r.getOrigReference()));
123          } catch (URISyntaxException e) {
124             LOG.log(Level.WARNING,
125                   "Unable to test if reference: [" + r.getOrigReference()
126                         + "] is a directory: skipping it");
127             continue;
128          }
129 
130          if (!refFile.isDirectory()) {
131             LOG.log(Level.FINE, "Reference: [" + r.getOrigReference()
132                   + "] is file: transferring it");
133 
134             try {
135                remoteTransfer(r, product);
136             } catch (URISyntaxException e) {
137                LOG.log(Level.WARNING,
138                      "Error transferring file: [" + r.getOrigReference()
139                            + "]: URISyntaxException: " + e.getMessage());
140             }
141          } else {
142             LOG.log(
143                   Level.FINE,
144                   "RemoteTransfer: skipping reference: ["
145                         + refFile.getAbsolutePath() + "] of product: ["
146                         + product.getProductName() + "]: ref is a directory");
147          }
148       }
149 
150       quietNotifyProductTransferComplete(product);
151 
152    }
153 
154    /*
155     * (non-Javadoc)
156     * 
157     * @see
158     * org.apache.oodt.cas.filemgr.datatransfer.DataTransfer#retrieveProduct(org.
159     * apache.oodt.cas.filemgr.structs.Product, java.io.File)
160     */
161    public void retrieveProduct(Product product, File directory)
162          throws DataTransferException, IOException {
163       for (Reference reference : product.getProductReferences()) {
164          FileOutputStream fOut = null;
165          try {
166             File dataStoreFile = new File(new URI(
167                   reference.getDataStoreReference()));
168             File dest = new File(directory, dataStoreFile.getName());
169             fOut = new FileOutputStream(dest, false);
170             LOG.log(
171                   Level.INFO,
172                   "RemoteDataTransfer: Copying File: " + "fmp:"
173                         + dataStoreFile.getAbsolutePath() + " to " + "file:"
174                         + dest.getAbsolutePath());
175             byte[] fileData = null;
176             int offset = 0;
177             while (true) {
178                fileData = (byte[]) client.retrieveFile(
179                      dataStoreFile.getAbsolutePath(), offset, 1024);
180                if (fileData.length <= 0)
181                   break;
182                fOut.write(fileData);
183                if (fileData.length < 1024)
184                   break;
185                offset += 1024;
186             }
187          } catch (Exception e) {
188             throw new DataTransferException("", e);
189          } finally {
190             try {
191                fOut.close();
192             } catch (Exception e) {
193             }
194          }
195       }
196    }
197 
198    private void remoteTransfer(Reference reference, Product product)
199          throws URISyntaxException {
200       // get the file path
201       File origFile = new File(new URI(reference.getOrigReference()));
202       File destFile = new File(new URI(reference.getDataStoreReference()));
203       String origFilePath = origFile.getAbsolutePath();
204       String destFilePath = destFile.getAbsolutePath();
205 
206       // read the file in chunk by chunk
207 
208       byte[] buf = new byte[chunkSize];
209 
210       FileInputStream is = null;
211 
212       try {
213          is = new FileInputStream(origFile);
214          int offset = 0;
215          int numBytes = 0;
216 
217          // remove the file if it already exists: this operation
218          // is an overwrite
219          if (!client.removeFile(destFilePath)) {
220             LOG.log(Level.WARNING,
221                   "RemoteDataTransfer: attempt to perform overwrite of dest file: ["
222                         + destFilePath + "] failed");
223          }
224 
225          while ((numBytes = is.read(buf, offset, chunkSize)) != -1) {
226             client.transferFile(destFilePath, buf, offset, numBytes);
227          }
228       } catch (IOException e) {
229          LOG.log(Level.WARNING,
230                "Error opening input stream to read file to transfer: Message: "
231                      + e.getMessage());
232          return;
233       } catch (DataTransferException e) {
234          LOG.log(
235                Level.WARNING,
236                "DataTransferException when transfering file: [" + origFilePath
237                      + "] to [" + destFilePath + "]: Message: "
238                      + e.getMessage());
239          return;
240       } finally {
241          if (is != null) {
242             try {
243                is.close();
244             } catch (Exception ignore) {
245             }
246 
247             is = null;
248          }
249       }
250    }
251 
252    private void quietNotifyTransferProduct(Product p) {
253       try {
254          client.transferringProduct(p);
255       } catch (DataTransferException e) {
256          e.printStackTrace();
257          LOG.log(Level.WARNING,
258                "Error notifying file manager of product transfer initiation for product: ["
259                      + p.getProductId() + "]: Message: " + e.getMessage());
260          return;
261       }
262    }
263 
264    private void quietNotifyProductTransferComplete(Product p) {
265       try {
266          client.removeProductTransferStatus(p);
267       } catch (DataTransferException e) {
268          e.printStackTrace();
269          LOG.log(Level.WARNING,
270                "Error notifying file manager of product transfer completion for product: ["
271                      + p.getProductId() + "]: Message: " + e.getMessage());
272          return;
273       }
274    }
275 
276 }