View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.oodt.cas.filemgr.datatransfer;
19  
20  //OODT imports
21  import org.apache.oodt.cas.filemgr.structs.Product;
22  import org.apache.oodt.cas.filemgr.structs.Reference;
23  import org.apache.oodt.cas.filemgr.structs.exceptions.ConnectionException;
24  import org.apache.oodt.cas.filemgr.structs.exceptions.DataTransferException;
25  import org.apache.oodt.cas.filemgr.system.XmlRpcFileManagerClient;
26  
27  
28  //JDK imports
29  import java.io.File;
30  import java.io.FileInputStream;
31  import java.io.FileOutputStream;
32  import java.io.IOException;
33  import java.net.URI;
34  import java.net.URISyntaxException;
35  import java.net.URL;
36  import java.util.Iterator;
37  import java.util.logging.Level;
38  import java.util.logging.Logger;
39  
40  /**
41   * @author mattmann
42   * @author bfoster
43   * @version $Revision$
44   * 
45   *          <p>
46   *          An implementation of the {@link DataTransfer} interface that
47   *          transfers files to a remote file manager over XML-RPC, using the
48   *          File Manager Client.
49   *          </p>
50   * 
51   */
52  public class RemoteDataTransferer implements DataTransfer {
53  
54     /*
55      * the url pointer to the file manager that we'll remotely transfer the file
56      * to
57      */
58     private URL fileManagerUrl = null;
59  
60     /*
61      * the size of the chunks that files should be transferred over XML-RPC using
62      */
63     private int chunkSize = 1024;
64  
65     /* our file manager client */
66     private XmlRpcFileManagerClient client = null;
67  
68     /* our log stream */
69     private static final Logger LOG = Logger
70           .getLogger(RemoteDataTransferer.class.getName());
71  
72     /**
73       * 
74       */
75     public RemoteDataTransferer(int chunkSz) {
76        this.chunkSize = chunkSz;
77     }
78  
79     /*
80      * (non-Javadoc)
81      * 
82      * @see
83      * org.apache.oodt.cas.filemgr.datatransfer.DataTransfer#setFileManagerUrl
84      * (java.net.URL)
85      */
86     public void setFileManagerUrl(URL url) {
87        try {
88           client = new XmlRpcFileManagerClient(url);
89           this.fileManagerUrl = url;
90           LOG.log(Level.INFO, "Remote Data Transfer to: ["
91                 + client.getFileManagerUrl().toString() + "] enabled");
92        } catch (ConnectionException e) {
93           LOG.log(Level.WARNING, "Connection exception for filemgr: [" + url
94                 + "]");
95        }
96     }
97  
98     /*
99      * (non-Javadoc)
100     * 
101     * @see
102     * org.apache.oodt.cas.filemgr.datatransfer.DataTransfer#transferProduct(
103     * org.apache.oodt.cas.filemgr.structs.Product)
104     */
105    public void transferProduct(Product product) throws DataTransferException,
106          IOException {
107 
108       if (fileManagerUrl == null) {
109          throw new DataTransferException(
110                "No file manager url specified for remote data transfer: cannot transfer product: ["
111                      + product.getProductName() + "]!");
112       }
113 
114       quietNotifyTransferProduct(product);
115 
116       // for each file reference, transfer the file to the remote file manager
117       for (Iterator<Reference> i = product.getProductReferences().iterator(); i
118             .hasNext();) {
119          Reference r = i.next();
120          // test whether or not the reference is a directory or a file
121          File refFile = null;
122          try {
123             refFile = new File(new URI(r.getOrigReference()));
124          } catch (URISyntaxException e) {
125             LOG.log(Level.WARNING,
126                   "Unable to test if reference: [" + r.getOrigReference()
127                         + "] is a directory: skipping it");
128             continue;
129          }
130 
131          if (!refFile.isDirectory()) {
132             LOG.log(Level.FINE, "Reference: [" + r.getOrigReference()
133                   + "] is file: transferring it");
134 
135             try {
136                remoteTransfer(r, product);
137             } catch (URISyntaxException e) {
138                LOG.log(Level.WARNING,
139                      "Error transferring file: [" + r.getOrigReference()
140                            + "]: URISyntaxException: " + e.getMessage());
141             }
142          } else {
143             LOG.log(
144                   Level.FINE,
145                   "RemoteTransfer: skipping reference: ["
146                         + refFile.getAbsolutePath() + "] of product: ["
147                         + product.getProductName() + "]: ref is a directory");
148          }
149       }
150 
151       quietNotifyProductTransferComplete(product);
152 
153    }
154 
155    /*
156     * (non-Javadoc)
157     * 
158     * @see
159     * org.apache.oodt.cas.filemgr.datatransfer.DataTransfer#retrieveProduct(org.
160     * apache.oodt.cas.filemgr.structs.Product, java.io.File)
161     */
162    public void retrieveProduct(Product product, File directory)
163          throws DataTransferException, IOException {
164       for (Reference reference : product.getProductReferences()) {
165          FileOutputStream fOut = null;
166          try {
167             File dataStoreFile = new File(new URI(
168                   reference.getDataStoreReference()));
169             File dest = new File(directory, dataStoreFile.getName());
170             fOut = new FileOutputStream(dest, false);
171             LOG.log(
172                   Level.INFO,
173                   "RemoteDataTransfer: Copying File: " + "fmp:"
174                         + dataStoreFile.getAbsolutePath() + " to " + "file:"
175                         + dest.getAbsolutePath());
176             byte[] fileData = null;
177             int offset = 0;
178             while (true) {
179                fileData = (byte[]) client.retrieveFile(
180                      dataStoreFile.getAbsolutePath(), offset, 1024);
181                if (fileData.length <= 0)
182                   break;
183                fOut.write(fileData);
184                if (fileData.length < 1024)
185                   break;
186                offset += 1024;
187             }
188          } catch (Exception e) {
189             throw new DataTransferException("", e);
190          } finally {
191             try {
192                fOut.close();
193             } catch (Exception e) {
194             }
195          }
196       }
197    }
198 
199    @Override
200    public void deleteProduct(Product product) throws DataTransferException, IOException {
201      for (Reference ref : product.getProductReferences()) {
202        File dataFile = new File(URI.create(ref.getDataStoreReference()).toURL().getPath());
203        if (!dataFile.delete()) {
204         throw new IOException(String.format("Failed to delete file %s - delete returned false",
205             dataFile));
206        }
207      }
208    }
209    
210    private void remoteTransfer(Reference reference, Product product)
211          throws URISyntaxException {
212       // get the file path
213       File origFile = new File(new URI(reference.getOrigReference()));
214       File destFile = new File(new URI(reference.getDataStoreReference()));
215       String origFilePath = origFile.getAbsolutePath();
216       String destFilePath = destFile.getAbsolutePath();
217 
218       // read the file in chunk by chunk
219 
220       byte[] buf = new byte[chunkSize];
221 
222       FileInputStream is = null;
223 
224       try {
225          is = new FileInputStream(origFile);
226          int offset = 0;
227          int numBytes = 0;
228 
229          // remove the file if it already exists: this operation
230          // is an overwrite
231          if (!client.removeFile(destFilePath)) {
232             LOG.log(Level.WARNING,
233                   "RemoteDataTransfer: attempt to perform overwrite of dest file: ["
234                         + destFilePath + "] failed");
235          }
236 
237          while ((numBytes = is.read(buf, offset, chunkSize)) != -1) {
238             client.transferFile(destFilePath, buf, offset, numBytes);
239          }
240       } catch (IOException e) {
241          LOG.log(Level.WARNING,
242                "Error opening input stream to read file to transfer: Message: "
243                      + e.getMessage());
244          return;
245       } catch (DataTransferException e) {
246          LOG.log(
247                Level.WARNING,
248                "DataTransferException when transfering file: [" + origFilePath
249                      + "] to [" + destFilePath + "]: Message: "
250                      + e.getMessage());
251          return;
252       } finally {
253          if (is != null) {
254             try {
255                is.close();
256             } catch (Exception ignore) {
257             }
258 
259             is = null;
260          }
261       }
262    }
263 
264    private void quietNotifyTransferProduct(Product p) {
265       try {
266          client.transferringProduct(p);
267       } catch (DataTransferException e) {
268          e.printStackTrace();
269          LOG.log(Level.WARNING,
270                "Error notifying file manager of product transfer initiation for product: ["
271                      + p.getProductId() + "]: Message: " + e.getMessage());
272          return;
273       }
274    }
275 
276    private void quietNotifyProductTransferComplete(Product p) {
277       try {
278          client.removeProductTransferStatus(p);
279       } catch (DataTransferException e) {
280          e.printStackTrace();
281          LOG.log(Level.WARNING,
282                "Error notifying file manager of product transfer completion for product: ["
283                      + p.getProductId() + "]: Message: " + e.getMessage());
284          return;
285       }
286    }
287 
288 }