AESBlobStoreDAO changes

Created Diff never expires
45 removals
203 lines
49 additions
209 lines
/****************************************************************
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* with the License. You may obtain a copy of the License at *
* *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* *
* Unless required by applicable law or agreed to in writing, *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* specific language governing permissions and limitations *
* under the License. *
* under the License. *
****************************************************************/
****************************************************************/


package org.apache.james.blob.aes;
package org.apache.james.blob.aes;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.security.GeneralSecurityException;
import java.util.Collection;
import java.util.Collection;
import java.util.Optional;


import org.apache.commons.io.IOUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ThresholdingOutputStream;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.BucketName;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.apache.james.blob.api.ObjectStoreIOException;
import org.apache.james.util.Size;
import org.apache.james.core.JamesFileBackedOutputStream;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;


import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteSource;
import com.google.common.io.ByteStreams;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.FileBackedOutputStream;
import com.google.crypto.tink.subtle.AesGcmHkdfStreaming;
import com.google.crypto.tink.subtle.AesGcmHkdfStreaming;


import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.core.scheduler.Schedulers;


public class AESBlobStoreDAO implements BlobStoreDAO {
public class AESBlobStoreDAO implements BlobStoreDAO {
public static final int FILE_THRESHOLD_ENCRYPT = Optional.ofNullable(System.getProperty("james.blob.aes.file.threshold.encrypt"))
// For now, aligned with with MimeMessageInputStreamSource file threshold, detailed benchmarking might be conducted to challenge this choice
.map(s -> Size.parse(s, Size.Unit.NoUnit))
public static final int FILE_THRESHOLD_100_KB = 100 * 1024;
.map(s -> (int) s.asBytes())
public static final int MAXIMUM_BLOB_SIZE = 100 * 1024 * 1024;
.orElse(100 * 1024);
public static final int FILE_THRESHOLD_100_KB_READ = 512 * 1024;
public static final int MAXIMUM_BLOB_SIZE = Optional.ofNullable(System.getProperty("james.blob.aes.blob.max.size"))
.map(s -> Size.parse(s, Size.Unit.NoUnit))
.map(s -> (int) s.asBytes())
.orElse(100 * 1024 * 1024);
private final BlobStoreDAO underlying;
private final BlobStoreDAO underlying;
private final AesGcmHkdfStreaming streamingAead;
private final AesGcmHkdfStreaming streamingAead;


public AESBlobStoreDAO(BlobStoreDAO underlying, CryptoConfig cryptoConfig) {
public AESBlobStoreDAO(BlobStoreDAO underlying, CryptoConfig cryptoConfig) {
this.underlying = underlying;
this.underlying = underlying;
this.streamingAead = PBKDF2StreamingAeadFactory.newAesGcmHkdfStreaming(cryptoConfig);
this.streamingAead = PBKDF2StreamingAeadFactory.newAesGcmHkdfStreaming(cryptoConfig);
}
}


private Pair<FileBackedOutputStream, Long> encrypt(InputStream input) throws IOException {
public JamesFileBackedOutputStream encrypt(InputStream input) throws IOException {
FileBackedOutputStream encryptedContent = new FileBackedOutputStream(FILE_THRESHOLD_ENCRYPT);
JamesFileBackedOutputStream encryptedContent = new JamesFileBackedOutputStream("aesencrypt", FILE_THRESHOLD_100_KB);
try (CountingOutputStream countingOutputStream = new CountingOutputStream(encryptedContent)) {
try {
OutputStream outputStream = streamingAead.newEncryptingStream(countingOutputStream, PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA);
OutputStream outputStream = streamingAead.newEncryptingStream(encryptedContent, PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA);
input.transferTo(outputStream);
input.transferTo(outputStream);
outputStream.close();
outputStream.close();
return Pair.of(encryptedContent, countingOutputStream.getCount());
return encryptedContent;
} catch (Exception e) {
} catch (Exception e) {
encryptedContent.reset();
encryptedContent.reset();
throw new RuntimeException("Unable to build payload for object storage, failed to encrypt", e);
throw new RuntimeException("Unable to build payload for object storage, failed to encrypt", e);
} finally {
} finally {
encryptedContent.close();
encryptedContent.close();
}
}
}
}


public InputStream decrypt(InputStream ciphertext) throws IOException {
public InputStream decrypt(InputStream ciphertext) throws IOException {
// We break symmetry and avoid allocating resources like files as we are not able, in higher level APIs (mailbox) to do resource cleanup.
// We break symmetry and avoid allocating resources like files as we are not able, in higher level APIs (mailbox) to do resource cleanup.
try {
try {
return ByteStreams.limit(
return ByteStreams.limit(
streamingAead.newDecryptingStream(ciphertext, PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA),
streamingAead.newDecryptingStream(ciphertext, PBKDF2StreamingAeadFactory.EMPTY_ASSOCIATED_DATA),
MAXIMUM_BLOB_SIZE);
MAXIMUM_BLOB_SIZE);
} catch (GeneralSecurityException e) {
} catch (GeneralSecurityException e) {
throw new IOException("Incorrect crypto setup", e);
throw new IOException("Incorrect crypto setup", e);
}
}
}
}


public Mono<byte[]> decryptReactiveByteSource(ReactiveByteSource ciphertext, BlobId blobId) {
if (ciphertext.getSize() > MAXIMUM_BLOB_SIZE) {
throw new RuntimeException(blobId.asString() + " exceeded maximum blob size");
}

return Mono.fromCallable(() -> {
try {
JamesFileBackedOutputStream decryptedContent = new JamesFileBackedOutputStream("aes-decrypt-inner", FILE_THRESHOLD_100_KB_READ);
try {
CountingOutputStream countingOutputStream = new CountingOutputStream(decryptedContent);
try (InputStream ciphertextStream = ReactorUtils.toInputStream(Flux.from(ciphertext.getContent()))) {
decrypt(ciphertextStream).transferTo(countingOutputStream);
}
try (InputStream decryptedStream = decryptedContent.asByteSource().openStream()) {
return IOUtils.toByteArray(decryptedStream, countingOutputStream.getCount());
}
} finally {
decryptedContent.reset();
decryptedContent.close();
}
} catch (OutOfMemoryError error) {
LoggerFactory.getLogger(AESBlobStoreDAO.class)
.error("OOM reading {}. Blob size read so far {} bytes.", blobId.asString(), ciphertext.getSize());
throw new RuntimeException(error);
}
});
}

@Override
@Override
public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException {
try {
try {
return decrypt(underlying.read(bucketName, blobId));
return decrypt(underlying.read(bucketName, blobId));
} catch (IOException e) {
} catch (IOException e) {
throw new ObjectStoreIOException("Error reading blob " + blobId.asString(), e);
throw new ObjectStoreIOException("Error reading blob " + blobId.asString(), e);
}
}
}
}


@Override
@Override
public Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId) {
public Publisher<InputStream> readReactive(BucketName bucketName, BlobId blobId) {
return Mono.from(underlying.readReactive(bucketName, blobId))
return Mono.from(underlying.readReactive(bucketName, blobId))
.map(Throwing.function(this::decrypt));
.map(Throwing.function(this::decrypt));
}
}


@Override
@Override
public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
return Mono.from(underlying.readBytes(bucketName, blobId))
return Mono.from(underlying.readBytes(bucketName, blobId))
.map(Throwing.function(bytes -> {
.map(Throwing.function(bytes -> {
InputStream inputStream = decrypt(new ByteArrayInputStream(bytes));
InputStream inputStream = decrypt(new ByteArrayInputStream(bytes));
try (UnsynchronizedByteArrayOutputStream outputStream = UnsynchronizedByteArrayOutputStream.builder()
try (UnsynchronizedByteArrayOutputStream outputStream = UnsynchronizedByteArrayOutputStream.builder()
.setBufferSize(bytes.length + PBKDF2StreamingAeadFactory.SEGMENT_SIZE)
.setBufferSize(bytes.length + PBKDF2StreamingAeadFactory.SEGMENT_SIZE)
.get()) {
.get()) {
IOUtils.copy(inputStream, outputStream);
IOUtils.copy(inputStream, outputStream);
return outputStream.toByteArray();
return outputStream.toByteArray();
}
}
}));
}));
}
}


@Override
@Override
public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(blobId);
Preconditions.checkNotNull(blobId);
Preconditions.checkNotNull(data);
Preconditions.checkNotNull(data);


return save(bucketName, blobId, new ByteArrayInputStream(data));
return save(bucketName, blobId, new ByteArrayInputStream(data));
}
}


@Override
@Override
public Publisher<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
public Publisher<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) {
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(blobId);
Preconditions.checkNotNull(blobId);
Preconditions.checkNotNull(inputStream);
Preconditions.checkNotNull(inputStream);


return Mono.usingWhen(
return Mono.usingWhen(
Mono.fromCallable(() -> encrypt(inputStream)),
Mono.fromCallable(() -> encrypt(inputStream)),
pair -> Mono.from(underlying.save(bucketName, blobId, byteSourceWithSize(pair.getLeft().asByteSource(), pair.getRight()))),
fileBackedOutputStream -> Mono.from(underlying.save(bucketName, blobId, fileBackedOutputStream.asByteSource())),
Throwing.function(pair -> Mono.fromRunnable(Throwing.runnable(pair.getLeft()::reset)).subscribeOn(Schedulers.boundedElastic())))
Throwing.function(pair -> Mono.fromRunnable(Throwing.runnable(pair::reset)).subscribeOn(Schedulers.boundedElastic())))
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(Schedulers.boundedElastic())
.onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving bytearray", e));
.onErrorMap(e -> new ObjectStoreIOException("Exception occurred while saving bytearray", e));
}
}


private ByteSource byteSourceWithSize(ByteSource byteSource, long size) {
return new ByteSource() {
@Override
public InputStream openStream() throws IOException {
return byteSource.openStream();
}

@Override
public com.google.common.base.Optional<Long> sizeIfKnown() {
return com.google.common.base.Optional.of(size);
}

@Override
public long size() {
return size;
}
};
}

@Override
@Override
public Publisher<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
public Publisher<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(bucketName);
Preconditions.checkNotNull(blobId);
Preconditions.checkNotNull(blobId);
Preconditions.checkNotNull(content);
Preconditions.checkNotNull(content);


return Mono.using(content::openStream,
return Mono.using(content::openStream,
in -> Mono.from(save(bucketName, blobId, in)),
in -> Mono.from(save(bucketName, blobId, in)),
Throwing.consumer(InputStream::close))
Throwing.consumer(InputStream::close))
.subscribeOn(Schedulers.boundedElastic());
.subscribeOn(Schedulers.boundedElastic());
}
}


@Override
@Override
public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
return underlying.delete(bucketName, blobId);
return underlying.delete(bucketName, blobId);
}
}


@Override
@Override
public Publisher<Void> delete(BucketName bucketName, Collection<BlobId> blobIds) {
public Publisher<Void> delete(BucketName bucketName, Collection<BlobId> blobIds) {
return underlying.delete(bucketName, blobIds);
return underlying.delete(bucketName, blobIds);
}
}


@Override
@Override
public Publisher<Void> deleteBucket(BucketName bucketName) {
public Publisher<Void> deleteBucket(BucketName bucketName) {
return underlying.deleteBucket(bucketName);
return underlying.deleteBucket(bucketName);
}
}


@Override
@Override
public Publisher<BucketName> listBuckets() {
public Publisher<BucketName> listBuckets() {
return underlying.listBuckets();
return underlying.listBuckets();
}
}


@Override
@Override
public Publisher<BlobId> listBlobs(BucketName bucketName) {
public Publisher<BlobId> listBlobs(BucketName bucketName) {
return underlying.listBlobs(bucketName);
return underlying.listBlobs(bucketName);
}
}
}
}