One index per type and parallel indexing (#1781)

Before this change the search uses a single index which distinguishes types (repositories, users, etc.) with a field (_type).
But it has turned out that this could lead to problems, in particular if different types have the same field and uses different analyzers for those fields. The following links show even more problems of a combined index:

    https://www.elastic.co/blog/index-vs-type
    https://www.elastic.co/guide/en/elasticsearch/reference/6.0/removal-of-types.html

With this change every type becomes its own index and the SearchEngine gets an api to modify multiple indices at once to remove all documents from all indices, which are related to a specific repository, for example.

The search uses another new api to coordinate the indexing, the central work queue.
The central work queue is able to coordinate long-running or resource intensive tasks. It is able to run tasks in parallel, but can also run tasks which targets the same resources in sequence. The queue is also persistent and can restore queued tasks after restart.

Co-authored-by: Konstantin Schaper <konstantin.schaper@cloudogu.com>
This commit is contained in:
Sebastian Sdorra
2021-08-25 15:40:11 +02:00
committed by GitHub
parent 44f25d6b15
commit 0a26741ebd
72 changed files with 4536 additions and 1420 deletions

View File

@@ -30,8 +30,10 @@ import sonia.scm.plugin.Extension;
import sonia.scm.search.HandlerEventIndexSyncer;
import sonia.scm.search.Id;
import sonia.scm.search.Index;
import sonia.scm.search.IndexLogStore;
import sonia.scm.search.Indexer;
import sonia.scm.search.SearchEngine;
import sonia.scm.search.SerializableIndexTask;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -43,12 +45,10 @@ public class GroupIndexer implements Indexer<Group> {
@VisibleForTesting
static final int VERSION = 1;
private final GroupManager groupManager;
private final SearchEngine searchEngine;
@Inject
public GroupIndexer(GroupManager groupManager, SearchEngine searchEngine) {
this.groupManager = groupManager;
public GroupIndexer(SearchEngine searchEngine) {
this.searchEngine = searchEngine;
}
@@ -62,47 +62,47 @@ public class GroupIndexer implements Indexer<Group> {
return VERSION;
}
@Subscribe(async = false)
public void handleEvent(GroupEvent event) {
new HandlerEventIndexSyncer<>(this).handleEvent(event);
@Override
public Class<? extends ReIndexAllTask<Group>> getReIndexAllTask() {
return ReIndexAll.class;
}
@Override
public Updater<Group> open() {
return new GroupIndexUpdater(groupManager, searchEngine.forType(Group.class).getOrCreate());
public SerializableIndexTask<Group> createStoreTask(Group group) {
return index -> store(index, group);
}
public static class GroupIndexUpdater implements Updater<Group> {
@Override
public SerializableIndexTask<Group> createDeleteTask(Group group) {
return index -> index.delete().byId(Id.of(group));
}
@Subscribe(async = false)
public void handleEvent(GroupEvent event) {
new HandlerEventIndexSyncer<>(searchEngine, this).handleEvent(event);
}
public static void store(Index<Group> index, Group group) {
index.store(Id.of(group), GroupPermissions.read(group).asShiroString(), group);
}
public static class ReIndexAll extends ReIndexAllTask<Group> {
private final GroupManager groupManager;
private final Index<Group> index;
private GroupIndexUpdater(GroupManager groupManager, Index<Group> index) {
@Inject
public ReIndexAll(IndexLogStore logStore, GroupManager groupManager) {
super(logStore, Group.class, VERSION);
this.groupManager = groupManager;
this.index = index;
}
@Override
public void store(Group group) {
index.store(Id.of(group), GroupPermissions.read(group).asShiroString(), group);
}
@Override
public void delete(Group group) {
index.delete().byType().byId(Id.of(group));
}
@Override
public void reIndexAll() {
index.delete().byType().all();
public void update(Index<Group> index) {
index.delete().all();
for (Group group : groupManager.getAll()) {
store(group);
store(index, group);
}
}
@Override
public void close() {
index.close();
}
}
}

View File

@@ -134,6 +134,8 @@ import sonia.scm.web.cgi.DefaultCGIExecutorFactory;
import sonia.scm.web.filter.LoggingFilter;
import sonia.scm.web.security.AdministrationContext;
import sonia.scm.web.security.DefaultAdministrationContext;
import sonia.scm.work.CentralWorkQueue;
import sonia.scm.work.DefaultCentralWorkQueue;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
@@ -290,6 +292,8 @@ class ScmServletModule extends ServletModule {
bind(SearchEngine.class, LuceneSearchEngine.class);
bind(IndexLogStore.class, DefaultIndexLogStore.class);
bind(CentralWorkQueue.class, DefaultCentralWorkQueue.class);
bind(ContentTypeResolver.class).to(DefaultContentTypeResolver.class);
}

View File

@@ -26,12 +26,14 @@ package sonia.scm.repository;
import com.github.legman.Subscribe;
import com.google.common.annotations.VisibleForTesting;
import sonia.scm.HandlerEventType;
import sonia.scm.plugin.Extension;
import sonia.scm.search.HandlerEventIndexSyncer;
import sonia.scm.search.Id;
import sonia.scm.search.Index;
import sonia.scm.search.IndexLogStore;
import sonia.scm.search.Indexer;
import sonia.scm.search.SearchEngine;
import sonia.scm.search.SerializableIndexTask;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -43,12 +45,10 @@ public class RepositoryIndexer implements Indexer<Repository> {
@VisibleForTesting
static final int VERSION = 3;
private final RepositoryManager repositoryManager;
private final SearchEngine searchEngine;
@Inject
public RepositoryIndexer(RepositoryManager repositoryManager, SearchEngine searchEngine) {
this.repositoryManager = repositoryManager;
public RepositoryIndexer(SearchEngine searchEngine) {
this.searchEngine = searchEngine;
}
@@ -62,49 +62,58 @@ public class RepositoryIndexer implements Indexer<Repository> {
return Repository.class;
}
@Override
public Class<? extends ReIndexAllTask<Repository>> getReIndexAllTask() {
return ReIndexAll.class;
}
@Subscribe(async = false)
public void handleEvent(RepositoryEvent event) {
new HandlerEventIndexSyncer<>(this).handleEvent(event);
HandlerEventType type = event.getEventType();
if (type.isPost()) {
Repository repository = event.getItem();
if (type == HandlerEventType.DELETE) {
searchEngine.forIndices()
.forResource(repository)
.batch(createDeleteTask(repository));
} else {
searchEngine.forType(Repository.class)
.update(createStoreTask(repository));
}
}
}
@Override
public Updater<Repository> open() {
return new RepositoryIndexUpdater(repositoryManager, searchEngine.forType(getType()).getOrCreate());
public SerializableIndexTask<Repository> createStoreTask(Repository repository) {
return index -> store(index, repository);
}
public static class RepositoryIndexUpdater implements Updater<Repository> {
@Override
public SerializableIndexTask<Repository> createDeleteTask(Repository repository) {
return index -> index.delete().byRepository(repository);
}
private static void store(Index<Repository> index, Repository repository) {
index.store(Id.of(repository), RepositoryPermissions.read(repository).asShiroString(), repository);
}
public static class ReIndexAll extends ReIndexAllTask<Repository> {
private final RepositoryManager repositoryManager;
private final Index<Repository> index;
public RepositoryIndexUpdater(RepositoryManager repositoryManager, Index<Repository> index) {
@Inject
public ReIndexAll(IndexLogStore logStore, RepositoryManager repositoryManager) {
super(logStore, Repository.class, VERSION);
this.repositoryManager = repositoryManager;
this.index = index;
}
@Override
public void store(Repository repository) {
index.store(Id.of(repository), RepositoryPermissions.read(repository).asShiroString(), repository);
}
@Override
public void delete(Repository repository) {
index.delete().allTypes().byRepository(repository.getId());
}
@Override
public void reIndexAll() {
// v1 used the whole classname as type
index.delete().allTypes().byTypeName(Repository.class.getName());
index.delete().byType().all();
public void update(Index<Repository> index) {
index.delete().all();
for (Repository repository : repositoryManager.getAll()) {
store(repository);
store(index, repository);
}
}
@Override
public void close() {
index.close();
}
}
}

View File

@@ -27,9 +27,7 @@ package sonia.scm.search;
final class FieldNames {
private FieldNames(){}
static final String UID = "_uid";
static final String ID = "_id";
static final String TYPE = "_type";
static final String REPOSITORY = "_repository";
static final String PERMISSION = "_permission";
}

View File

@@ -27,7 +27,6 @@ package sonia.scm.search;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sonia.scm.plugin.Extension;
import sonia.scm.web.security.AdministrationContext;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -43,13 +42,13 @@ public class IndexBootstrapListener implements ServletContextListener {
private static final Logger LOG = LoggerFactory.getLogger(IndexBootstrapListener.class);
private final AdministrationContext administrationContext;
private final SearchEngine searchEngine;
private final IndexLogStore indexLogStore;
private final Set<Indexer> indexers;
@Inject
public IndexBootstrapListener(AdministrationContext administrationContext, IndexLogStore indexLogStore, Set<Indexer> indexers) {
this.administrationContext = administrationContext;
public IndexBootstrapListener(SearchEngine searchEngine, IndexLogStore indexLogStore, Set<Indexer> indexers) {
this.searchEngine = searchEngine;
this.indexLogStore = indexLogStore;
this.indexers = indexers;
}
@@ -65,8 +64,11 @@ public class IndexBootstrapListener implements ServletContextListener {
Optional<IndexLog> indexLog = indexLogStore.defaultIndex().get(indexer.getType());
if (indexLog.isPresent()) {
int version = indexLog.get().getVersion();
if (version < indexer.getVersion()) {
LOG.debug("index version {} is older then {}, start reindexing of all {}", version, indexer.getVersion(), indexer.getType());
if (version != indexer.getVersion()) {
LOG.debug(
"index version {} is older then {}, start reindexing of all {}",
version, indexer.getVersion(), indexer.getType()
);
indexAll(indexer);
}
} else {
@@ -75,14 +77,9 @@ public class IndexBootstrapListener implements ServletContextListener {
}
}
@SuppressWarnings("unchecked")
private void indexAll(Indexer indexer) {
administrationContext.runAsAdmin(() -> {
try (Indexer.Updater updater = indexer.open()) {
updater.reIndexAll();
}
});
indexLogStore.defaultIndex().log(indexer.getType(), indexer.getVersion());
searchEngine.forType(indexer.getType()).update(indexer.getReIndexAllTask());
}
@Override

View File

@@ -0,0 +1,139 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.search;
import lombok.Data;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.FSDirectory;
import sonia.scm.SCMContextProvider;
import sonia.scm.plugin.PluginLoader;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.xml.bind.JAXB;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@Singleton
public class IndexManager {
private final Path directory;
private final AnalyzerFactory analyzerFactory;
private final IndexXml indexXml;
@Inject
public IndexManager(SCMContextProvider context, PluginLoader pluginLoader, AnalyzerFactory analyzerFactory) {
directory = context.resolve(Paths.get("index"));
this.analyzerFactory = analyzerFactory;
this.indexXml = readIndexXml(pluginLoader.getUberClassLoader());
}
private IndexXml readIndexXml(ClassLoader uberClassLoader) {
Path path = directory.resolve("index.xml");
if (Files.exists(path)) {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(uberClassLoader);
return JAXB.unmarshal(path.toFile(), IndexXml.class);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
return new IndexXml();
}
public Collection<? extends IndexDetails> all() {
return Collections.unmodifiableSet(indexXml.indices);
}
public IndexReader openForRead(LuceneSearchableType type, String indexName) throws IOException {
Path path = resolveIndexDirectory(type, indexName);
return DirectoryReader.open(FSDirectory.open(path));
}
public IndexWriter openForWrite(IndexParams indexParams) {
IndexWriterConfig config = new IndexWriterConfig(analyzerFactory.create(indexParams.getSearchableType(), indexParams.getOptions()));
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
Path path = resolveIndexDirectory(indexParams);
if (!Files.exists(path)) {
store(new LuceneIndexDetails(indexParams.getType(), indexParams.getIndex()));
}
try {
return new IndexWriter(FSDirectory.open(path), config);
} catch (IOException ex) {
throw new SearchEngineException("failed to open index at " + path, ex);
}
}
private Path resolveIndexDirectory(IndexParams indexParams) {
return directory.resolve(indexParams.getSearchableType().getName()).resolve(indexParams.getIndex());
}
private Path resolveIndexDirectory(LuceneSearchableType searchableType, String indexName) {
return directory.resolve(searchableType.getName()).resolve(indexName);
}
private synchronized void store(LuceneIndexDetails details) {
if (!indexXml.getIndices().add(details)) {
return;
}
if (!Files.exists(directory)) {
try {
Files.createDirectory(directory);
} catch (IOException e) {
throw new SearchEngineException("failed to create index directory", e);
}
}
Path path = directory.resolve("index.xml");
JAXB.marshal(indexXml, path.toFile());
}
@Data
@XmlRootElement(name = "indices")
@XmlAccessorType(XmlAccessType.FIELD)
public static class IndexXml {
@XmlElement(name = "index")
private Set<LuceneIndexDetails> indices = new HashSet<>();
}
}

View File

@@ -27,10 +27,24 @@ package sonia.scm.search;
import lombok.Value;
@Value
public class IndexParams {
public class IndexParams implements IndexDetails {
String index;
LuceneSearchableType searchableType;
IndexOptions options;
@Override
public Class<?> getType() {
return searchableType.getType();
}
@Override
public String getName() {
return index;
}
@Override
public String toString() {
return searchableType.getName() + "/" + index;
}
}

View File

@@ -24,54 +24,71 @@
package sonia.scm.search;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.TermQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.function.Supplier;
import static sonia.scm.search.FieldNames.ID;
import static sonia.scm.search.FieldNames.PERMISSION;
import static sonia.scm.search.FieldNames.REPOSITORY;
import static sonia.scm.search.FieldNames.TYPE;
import static sonia.scm.search.FieldNames.UID;
class LuceneIndex<T> implements Index<T> {
class LuceneIndex<T> implements Index<T>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(LuceneIndex.class);
private final IndexDetails details;
private final LuceneSearchableType searchableType;
private final IndexWriter writer;
private final SharableIndexWriter writer;
LuceneIndex(LuceneSearchableType searchableType, IndexWriter writer) {
this.searchableType = searchableType;
this.writer = writer;
LuceneIndex(IndexParams params, Supplier<IndexWriter> writerFactory) {
this.details = params;
this.searchableType = params.getSearchableType();
this.writer = new SharableIndexWriter(writerFactory);
this.open();
}
void open() {
writer.open();
}
@VisibleForTesting
SharableIndexWriter getWriter() {
return writer;
}
@Override
public IndexDetails getDetails() {
return details;
}
@Override
public void store(Id id, String permission, Object object) {
String uid = createUid(id, searchableType);
Document document = searchableType.getTypeConverter().convert(object);
try {
field(document, UID, uid);
field(document, ID, id.getValue());
field(document, ID, id.asString());
id.getRepository().ifPresent(repository -> field(document, REPOSITORY, repository));
field(document, TYPE, searchableType.getName());
if (!Strings.isNullOrEmpty(permission)) {
field(document, PERMISSION, permission);
}
writer.updateDocument(new Term(UID, uid), document);
writer.updateDocument(idTerm(id), document);
} catch (IOException e) {
throw new SearchEngineException("failed to add document to index", e);
}
}
private String createUid(Id id, LuceneSearchableType type) {
return id.asString() + "/" + type.getName();
@Nonnull
private Term idTerm(Id id) {
return new Term(ID, id.asString());
}
private void field(Document document, String type, String name) {
@@ -94,24 +111,11 @@ class LuceneIndex<T> implements Index<T> {
private class LuceneDeleter implements Deleter {
@Override
public ByTypeDeleter byType() {
return new LuceneByTypeDeleter();
}
@Override
public AllTypesDeleter allTypes() {
return new LuceneAllTypesDelete();
}
}
@SuppressWarnings("java:S1192")
private class LuceneByTypeDeleter implements ByTypeDeleter {
@Override
public void byId(Id id) {
try {
writer.deleteDocuments(new Term(UID, createUid(id, searchableType)));
long count = writer.deleteDocuments(idTerm(id));
LOG.debug("delete {} document(s) by id {} from index {}", count, id, details);
} catch (IOException e) {
throw new SearchEngineException("failed to delete document from index", e);
}
@@ -120,7 +124,8 @@ class LuceneIndex<T> implements Index<T> {
@Override
public void all() {
try {
writer.deleteDocuments(new Term(TYPE, searchableType.getName()));
long count = writer.deleteAll();
LOG.debug("deleted all {} documents from index {}", count, details);
} catch (IOException ex) {
throw new SearchEngineException("failed to delete documents by type " + searchableType.getName() + " from index", ex);
}
@@ -129,35 +134,16 @@ class LuceneIndex<T> implements Index<T> {
@Override
public void byRepository(String repositoryId) {
try {
BooleanQuery query = new BooleanQuery.Builder()
.add(new TermQuery(new Term(TYPE, searchableType.getName())), BooleanClause.Occur.MUST)
.add(new TermQuery(new Term(REPOSITORY, repositoryId)), BooleanClause.Occur.MUST)
.build();
writer.deleteDocuments(query);
long count = writer.deleteDocuments(repositoryTerm(repositoryId));
LOG.debug("deleted {} documents by repository {} from index {}", count, repositoryId, details);
} catch (IOException ex) {
throw new SearchEngineException("failed to delete documents by repository " + repositoryId + " from index", ex);
}
}
}
private class LuceneAllTypesDelete implements AllTypesDeleter {
@Override
public void byRepository(String repositoryId) {
try {
writer.deleteDocuments(new Term(REPOSITORY, repositoryId));
} catch (IOException ex) {
throw new SearchEngineException("failed to delete all documents by repository " + repositoryId + " from index", ex);
}
}
@Override
public void byTypeName(String typeName) {
try {
writer.deleteDocuments(new Term(TYPE, typeName));
} catch (IOException ex) {
throw new SearchEngineException("failed to delete documents by type " + typeName + " from index", ex);
}
@Nonnull
private Term repositoryTerm(String repositoryId) {
return new Term(REPOSITORY, repositoryId);
}
}
}

View File

@@ -0,0 +1,43 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.search;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
@Data
@XmlRootElement
@NoArgsConstructor
@AllArgsConstructor
@XmlAccessorType(XmlAccessType.FIELD)
public class LuceneIndexDetails implements IndexDetails {
private Class<?> type;
private String name;
}

View File

@@ -24,23 +24,50 @@
package sonia.scm.search;
import javax.inject.Inject;
import java.io.IOException;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
@SuppressWarnings("unchecked")
public class LuceneIndexFactory {
private final IndexOpener indexOpener;
private final IndexManager indexManager;
@SuppressWarnings("rawtypes")
private final Map<IndexKey, LuceneIndex> indexes = new ConcurrentHashMap<>();
@Inject
public LuceneIndexFactory(IndexOpener indexOpener) {
this.indexOpener = indexOpener;
public LuceneIndexFactory(IndexManager indexManager) {
this.indexManager = indexManager;
}
public <T> LuceneIndex<T> create(IndexParams indexParams) {
try {
return new LuceneIndex<>(indexParams.getSearchableType(), indexOpener.openForWrite(indexParams));
} catch (IOException ex) {
throw new SearchEngineException("failed to open index " + indexParams.getIndex(), ex);
}
return indexes.compute(keyOf(indexParams), (key, index) -> {
if (index != null) {
index.open();
return index;
}
return new LuceneIndex<>(
indexParams,
() -> indexManager.openForWrite(indexParams)
);
});
}
private IndexKey keyOf(IndexParams indexParams) {
return new IndexKey(
indexParams.getSearchableType().getName(), indexParams.getIndex()
);
}
@EqualsAndHashCode
@AllArgsConstructor
private static class IndexKey {
String type;
String name;
}
}

View File

@@ -0,0 +1,76 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.search;
import com.google.inject.Injector;
import javax.inject.Inject;
import java.io.Serializable;
public abstract class LuceneIndexTask implements Runnable, Serializable {
private final Class<?> type;
private final String indexName;
private final IndexOptions options;
private transient LuceneIndexFactory indexFactory;
private transient SearchableTypeResolver searchableTypeResolver;
private transient Injector injector;
protected LuceneIndexTask(IndexParams params) {
this.type = params.getSearchableType().getType();
this.indexName = params.getIndex();
this.options = params.getOptions();
}
@Inject
public void setIndexFactory(LuceneIndexFactory indexFactory) {
this.indexFactory = indexFactory;
}
@Inject
public void setSearchableTypeResolver(SearchableTypeResolver searchableTypeResolver) {
this.searchableTypeResolver = searchableTypeResolver;
}
@Inject
public void setInjector(Injector injector) {
this.injector = injector;
}
public abstract IndexTask<?> task(Injector injector);
@SuppressWarnings({"unchecked", "rawtypes"})
public void run() {
LuceneSearchableType searchableType = searchableTypeResolver.resolve(type);
IndexTask<?> task = task(injector);
try (LuceneIndex index = indexFactory.create(new IndexParams(indexName, searchableType, options))) {
task.update(index);
}
task.afterUpdate();
}
}

View File

@@ -24,31 +24,21 @@
package sonia.scm.search;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Injector;
import sonia.scm.work.Task;
public final class IndexQueueTaskWrapper<T> implements Runnable {
@SuppressWarnings("rawtypes")
public class LuceneInjectingIndexTask extends LuceneIndexTask implements Task {
private static final Logger LOG = LoggerFactory.getLogger(IndexQueueTaskWrapper.class);
private final Class<? extends IndexTask> taskClass;
private final LuceneIndexFactory indexFactory;
private final IndexParams indexParams;
private final Iterable<IndexQueueTask<T>> tasks;
IndexQueueTaskWrapper(LuceneIndexFactory indexFactory, IndexParams indexParams, Iterable<IndexQueueTask<T>> tasks) {
this.indexFactory = indexFactory;
this.indexParams = indexParams;
this.tasks = tasks;
LuceneInjectingIndexTask(IndexParams params, Class<? extends IndexTask> taskClass) {
super(params);
this.taskClass = taskClass;
}
@Override
public void run() {
try (Index<T> index = indexFactory.create(indexParams)) {
for (IndexQueueTask<T> task : tasks) {
task.updateIndex(index);
}
} catch (Exception e) {
LOG.warn("failure during execution of index task for index {}", indexParams.getIndex(), e);
}
public IndexTask<?> task(Injector injector) {
return injector.getInstance(taskClass);
}
}

View File

@@ -54,12 +54,12 @@ public class LuceneQueryBuilder<T> extends QueryBuilder<T> {
private static final Logger LOG = LoggerFactory.getLogger(LuceneQueryBuilder.class);
private final IndexOpener opener;
private final IndexManager opener;
private final LuceneSearchableType searchableType;
private final String indexName;
private final Analyzer analyzer;
LuceneQueryBuilder(IndexOpener opener, String indexName, LuceneSearchableType searchableType, Analyzer analyzer) {
LuceneQueryBuilder(IndexManager opener, String indexName, LuceneSearchableType searchableType, Analyzer analyzer) {
this.opener = opener;
this.indexName = indexName;
this.searchableType = searchableType;
@@ -88,11 +88,11 @@ public class LuceneQueryBuilder<T> extends QueryBuilder<T> {
String queryString = Strings.nullToEmpty(queryParams.getQueryString());
Query parsedQuery = createQuery(searchableType, queryParams, queryString);
Query query = Queries.filter(parsedQuery, searchableType, queryParams);
Query query = Queries.filter(parsedQuery, queryParams);
if (LOG.isDebugEnabled()) {
LOG.debug("execute lucene query: {}", query);
}
try (IndexReader reader = opener.openForRead(indexName)) {
try (IndexReader reader = opener.openForRead(searchableType, indexName)) {
IndexSearcher searcher = new IndexSearcher(reader);
searcher.search(query, new PermissionAwareCollector(reader, collector));

View File

@@ -28,18 +28,18 @@ import javax.inject.Inject;
public class LuceneQueryBuilderFactory {
private final IndexOpener indexOpener;
private final IndexManager indexManager;
private final AnalyzerFactory analyzerFactory;
@Inject
public LuceneQueryBuilderFactory(IndexOpener indexOpener, AnalyzerFactory analyzerFactory) {
this.indexOpener = indexOpener;
public LuceneQueryBuilderFactory(IndexManager indexManager, AnalyzerFactory analyzerFactory) {
this.indexManager = indexManager;
this.analyzerFactory = analyzerFactory;
}
public <T> LuceneQueryBuilder<T> create(IndexParams indexParams) {
return new LuceneQueryBuilder<>(
indexOpener,
indexManager,
indexParams.getIndex(),
indexParams.getSearchableType(),
analyzerFactory.create(indexParams.getSearchableType(), indexParams.getOptions())

View File

@@ -24,24 +24,34 @@
package sonia.scm.search;
import com.google.common.base.Joiner;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.subject.Subject;
import sonia.scm.work.CentralWorkQueue;
import sonia.scm.work.CentralWorkQueue.Enqueue;
import sonia.scm.work.Task;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class LuceneSearchEngine implements SearchEngine {
private final IndexManager indexManager;
private final SearchableTypeResolver resolver;
private final IndexQueue indexQueue;
private final LuceneQueryBuilderFactory queryBuilderFactory;
private final CentralWorkQueue centralWorkQueue;
@Inject
public LuceneSearchEngine(SearchableTypeResolver resolver, IndexQueue indexQueue, LuceneQueryBuilderFactory queryBuilderFactory) {
public LuceneSearchEngine(IndexManager indexManager, SearchableTypeResolver resolver, LuceneQueryBuilderFactory queryBuilderFactory, CentralWorkQueue centralWorkQueue) {
this.indexManager = indexManager;
this.resolver = resolver;
this.indexQueue = indexQueue;
this.queryBuilderFactory = queryBuilderFactory;
this.centralWorkQueue = centralWorkQueue;
}
@Override
@@ -67,11 +77,79 @@ public class LuceneSearchEngine implements SearchEngine {
return new LuceneForType<>(searchableType);
}
private void enqueue(LuceneSearchableType searchableType, String index, List<String> resources, Task task) {
Enqueue enqueuer = centralWorkQueue.append();
String resourceName = Joiner.on('-').join(searchableType.getName(), index, "index");
if (resources.isEmpty()) {
enqueuer.locks(resourceName);
} else {
for (String resource : resources) {
enqueuer.locks(resourceName, resource);
}
}
enqueuer.runAsAdmin().enqueue(task);
}
@Override
public ForIndices forIndices() {
return new LuceneForIndices();
}
class LuceneForIndices implements ForIndices {
private final List<String> resources = new ArrayList<>();
private Predicate<IndexDetails> predicate = details -> true;
private IndexOptions options = IndexOptions.defaults();
@Override
public ForIndices matching(Predicate<IndexDetails> predicate) {
this.predicate = predicate;
return this;
}
@Override
public ForIndices withOptions(IndexOptions options) {
this.options = options;
return this;
}
@Override
public ForIndices forResource(String resource) {
this.resources.add(resource);
return this;
}
@Override
public void batch(SerializableIndexTask<?> task) {
exec(params -> batch(params, new LuceneSimpleIndexTask(params, task)));
}
@Override
public void batch(Class<? extends IndexTask<?>> task) {
exec(params -> batch(params, new LuceneInjectingIndexTask(params, task)));
}
private void exec(Consumer<IndexParams> consumer) {
indexManager.all()
.stream()
.filter(predicate)
.map(details -> new IndexParams(details.getName(), resolver.resolve(details.getType()), options))
.forEach(consumer);
}
private void batch(IndexParams params, Task task) {
LuceneSearchEngine.this.enqueue(params.getSearchableType(), params.getIndex(), resources, task);
}
}
class LuceneForType<T> implements ForType<T> {
private final LuceneSearchableType searchableType;
private IndexOptions options = IndexOptions.defaults();
private String index = "default";
private final List<String> resources = new ArrayList<>();
private LuceneForType(LuceneSearchableType searchableType) {
this.searchableType = searchableType;
@@ -94,8 +172,23 @@ public class LuceneSearchEngine implements SearchEngine {
}
@Override
public Index<T> getOrCreate() {
return indexQueue.getQueuedIndex(params());
public ForType<T> forResource(String resource) {
resources.add(resource);
return this;
}
@Override
public void update(Class<? extends IndexTask<T>> task) {
enqueue(new LuceneInjectingIndexTask(params(), task));
}
@Override
public void update(SerializableIndexTask<T> task) {
enqueue(new LuceneSimpleIndexTask(params(), task));
}
private void enqueue(Task task) {
LuceneSearchEngine.this.enqueue(searchableType, index, resources, task);
}
@Override

View File

@@ -24,53 +24,21 @@
package sonia.scm.search;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Injector;
import sonia.scm.work.Task;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public final class LuceneSimpleIndexTask extends LuceneIndexTask implements Task {
@Singleton
public class IndexQueue implements Closeable {
private final SerializableIndexTask<?> task;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final AtomicLong size = new AtomicLong(0);
private final LuceneIndexFactory indexFactory;
@Inject
public IndexQueue(LuceneIndexFactory indexFactory) {
this.indexFactory = indexFactory;
}
public <T> Index<T> getQueuedIndex(IndexParams indexParams) {
return new QueuedIndex<>(this, indexParams);
}
public LuceneIndexFactory getIndexFactory() {
return indexFactory;
}
<T> void enqueue(IndexQueueTaskWrapper<T> task) {
size.incrementAndGet();
executor.execute(() -> {
task.run();
size.decrementAndGet();
});
}
@VisibleForTesting
long getSize() {
return size.get();
LuceneSimpleIndexTask(IndexParams params, SerializableIndexTask<?> task) {
super(params);
this.task = task;
}
@Override
public void close() throws IOException {
executor.shutdown();
public IndexTask<?> task(Injector injector) {
injector.injectMembers(task);
return task;
}
}

View File

@@ -29,6 +29,8 @@ import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import java.util.Optional;
import static org.apache.lucene.search.BooleanClause.Occur.MUST;
final class Queries {
@@ -36,19 +38,18 @@ final class Queries {
private Queries() {
}
private static Query typeQuery(LuceneSearchableType type) {
return new TermQuery(new Term(FieldNames.TYPE, type.getName()));
}
private static Query repositoryQuery(String repositoryId) {
return new TermQuery(new Term(FieldNames.REPOSITORY, repositoryId));
}
static Query filter(Query query, LuceneSearchableType searchableType, QueryBuilder.QueryParams params) {
BooleanQuery.Builder builder = new BooleanQuery.Builder()
.add(query, MUST)
.add(typeQuery(searchableType), MUST);
params.getRepositoryId().ifPresent(repo -> builder.add(repositoryQuery(repo), MUST));
return builder.build();
static Query filter(Query query, QueryBuilder.QueryParams params) {
Optional<String> repositoryId = params.getRepositoryId();
if (repositoryId.isPresent()) {
return new BooleanQuery.Builder()
.add(query, MUST)
.add(repositoryQuery(repositoryId.get()), MUST)
.build();
}
return query;
}
}

View File

@@ -1,102 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.search;
import java.util.ArrayList;
import java.util.List;
class QueuedIndex<T> implements Index<T> {
private final IndexQueue queue;
private final IndexParams indexParams;
private final List<IndexQueueTask<T>> tasks = new ArrayList<>();
QueuedIndex(IndexQueue queue, IndexParams indexParams) {
this.queue = queue;
this.indexParams = indexParams;
}
@Override
public void store(Id id, String permission, T object) {
tasks.add(index -> index.store(id, permission, object));
}
@Override
public Deleter delete() {
return new QueueDeleter();
}
@Override
public void close() {
IndexQueueTaskWrapper<T> wrappedTask = new IndexQueueTaskWrapper<>(
queue.getIndexFactory(), indexParams, tasks
);
queue.enqueue(wrappedTask);
}
private class QueueDeleter implements Deleter {
@Override
public ByTypeDeleter byType() {
return new QueueByTypeDeleter();
}
@Override
public AllTypesDeleter allTypes() {
return new QueueAllTypesDeleter();
}
}
private class QueueByTypeDeleter implements ByTypeDeleter {
@Override
public void byId(Id id) {
tasks.add(index -> index.delete().byType().byId(id));
}
@Override
public void all() {
tasks.add(index -> index.delete().byType().all());
}
@Override
public void byRepository(String repositoryId) {
tasks.add(index -> index.delete().byType().byRepository(repositoryId));
}
}
private class QueueAllTypesDeleter implements AllTypesDeleter {
@Override
public void byRepository(String repositoryId) {
tasks.add(index -> index.delete().allTypes().byRepository(repositoryId));
}
@Override
public void byTypeName(String typeName) {
tasks.add(index -> index.delete().allTypes().byTypeName(typeName));
}
}
}

View File

@@ -0,0 +1,90 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.search;
import com.google.common.annotations.VisibleForTesting;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.function.Supplier;
class SharableIndexWriter {
private static final Logger LOG = LoggerFactory.getLogger(SharableIndexWriter.class);
private int usageCounter = 0;
private final Supplier<IndexWriter> writerFactory;
private IndexWriter writer;
SharableIndexWriter(Supplier<IndexWriter> writerFactory) {
this.writerFactory = writerFactory;
}
synchronized void open() {
usageCounter++;
if (usageCounter == 1) {
LOG.trace("open writer, because usage increased from zero to one");
writer = writerFactory.get();
} else {
LOG.trace("new task is using the writer, counter is now at {}", usageCounter);
}
}
@VisibleForTesting
int getUsageCounter() {
return usageCounter;
}
void updateDocument(Term term, Document document) throws IOException {
writer.updateDocument(term, document);
}
long deleteDocuments(Term term) throws IOException {
return writer.deleteDocuments(term);
}
long deleteAll() throws IOException {
return writer.deleteAll();
}
synchronized void close() throws IOException {
usageCounter--;
if (usageCounter == 0) {
LOG.trace("no one seems to use index any longer, closing underlying writer");
writer.close();
writer = null;
} else if (usageCounter > 0) {
LOG.trace("index is still used by {} task(s), commit work but keep writer open", usageCounter);
writer.commit();
} else {
LOG.warn("index is already closed");
}
}
}

View File

@@ -0,0 +1,128 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.security;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.mgt.SecurityManager;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.subject.support.SubjectThreadState;
import org.apache.shiro.util.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
/**
* Impersonator allows the usage of scm-manager api in the context of another user.
*
* @since 2.23.0
*/
public final class Impersonator {
private static final Logger LOG = LoggerFactory.getLogger(Impersonator.class);
private final SecurityManager securityManager;
@Inject
public Impersonator(SecurityManager securityManager) {
this.securityManager = securityManager;
}
public Session impersonate(PrincipalCollection principal) {
Subject subject = createSubject(principal);
if (ThreadContext.getSecurityManager() != null) {
return new WebImpersonator(subject);
}
return new NonWebImpersonator(securityManager, subject);
}
private Subject createSubject(PrincipalCollection principal) {
return new Subject.Builder(securityManager)
.authenticated(true)
.principals(principal)
.buildSubject();
}
public interface Session extends AutoCloseable {
void close();
}
private static class WebImpersonator implements Session {
private final Subject subject;
private final Subject previousSubject;
private WebImpersonator(Subject subject) {
this.subject = subject;
this.previousSubject = SecurityUtils.getSubject();
bind();
}
private void bind() {
LOG.debug("user {} start impersonate session as {}", previousSubject.getPrincipal(), subject.getPrincipal());
// do not use runas, because we want only bind the session to this thread.
// Runas could affect other threads.
ThreadContext.bind(this.subject);
}
@Override
public void close() {
LOG.debug("release impersonate session from user {} to {}", previousSubject.getPrincipal(), subject.getPrincipal());
ThreadContext.bind(previousSubject);
}
}
private static class NonWebImpersonator implements Session {
private final SecurityManager securityManager;
private final SubjectThreadState state;
private final Subject subject;
private NonWebImpersonator(SecurityManager securityManager, Subject subject) {
this.securityManager = securityManager;
this.state = new SubjectThreadState(subject);
this.subject = subject;
bind();
}
private void bind() {
LOG.debug("start impersonate session as user {}", subject.getPrincipal());
SecurityUtils.setSecurityManager(securityManager);
state.bind();
}
@Override
public void close() {
LOG.debug("release impersonate session of {}", subject.getPrincipal());
state.restore();
SecurityUtils.setSecurityManager(null);
}
}
}

View File

@@ -22,44 +22,59 @@
* SOFTWARE.
*/
package sonia.scm.search;
package sonia.scm.update.index;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import sonia.scm.SCMContextProvider;
import sonia.scm.migration.UpdateStep;
import sonia.scm.plugin.Extension;
import sonia.scm.util.IOUtil;
import sonia.scm.version.Version;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class IndexOpener {
import static sonia.scm.store.StoreConstants.DATA_DIRECTORY_NAME;
import static sonia.scm.store.StoreConstants.VARIABLE_DATA_DIRECTORY_NAME;
private final Path directory;
private final AnalyzerFactory analyzerFactory;
@Extension
public class RemoveCombinedIndex implements UpdateStep {
private final SCMContextProvider contextProvider;
@Inject
public IndexOpener(SCMContextProvider context, AnalyzerFactory analyzerFactory) {
directory = context.resolve(Paths.get("index"));
this.analyzerFactory = analyzerFactory;
public RemoveCombinedIndex(SCMContextProvider contextProvider) {
this.contextProvider = contextProvider;
}
public IndexReader openForRead(String name) throws IOException {
return DirectoryReader.open(directory(name));
@Override
public void doUpdate() throws IOException {
Path index = contextProvider.resolve(Paths.get("index"));
if (Files.exists(index)) {
IOUtil.delete(index.toFile());
}
Path indexLog = contextProvider.resolve(indexLogPath());
if (Files.exists(indexLog)) {
IOUtil.delete(indexLog.toFile());
}
}
public IndexWriter openForWrite(IndexParams indexParams) throws IOException {
IndexWriterConfig config = new IndexWriterConfig(analyzerFactory.create(indexParams.getSearchableType(), indexParams.getOptions()));
config.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
return new IndexWriter(directory(indexParams.getIndex()), config);
@Nonnull
private Path indexLogPath() {
return Paths.get(VARIABLE_DATA_DIRECTORY_NAME).resolve(DATA_DIRECTORY_NAME).resolve("index-log");
}
private Directory directory(String name) throws IOException {
return FSDirectory.open(directory.resolve(name));
@Override
public Version getTargetVersion() {
return Version.parse("2.0.0");
}
@Override
public String getAffectedDataType() {
return "sonia.scm.index";
}
}

View File

@@ -30,8 +30,10 @@ import sonia.scm.plugin.Extension;
import sonia.scm.search.HandlerEventIndexSyncer;
import sonia.scm.search.Id;
import sonia.scm.search.Index;
import sonia.scm.search.IndexLogStore;
import sonia.scm.search.Indexer;
import sonia.scm.search.SearchEngine;
import sonia.scm.search.SerializableIndexTask;
import javax.inject.Inject;
import javax.inject.Singleton;
@@ -43,12 +45,10 @@ public class UserIndexer implements Indexer<User> {
@VisibleForTesting
static final int VERSION = 1;
private final UserManager userManager;
private final SearchEngine searchEngine;
@Inject
public UserIndexer(UserManager userManager, SearchEngine searchEngine) {
this.userManager = userManager;
public UserIndexer(SearchEngine searchEngine) {
this.searchEngine = searchEngine;
}
@@ -62,47 +62,46 @@ public class UserIndexer implements Indexer<User> {
return VERSION;
}
@Subscribe(async = false)
public void handleEvent(UserEvent event) {
new HandlerEventIndexSyncer<>(this).handleEvent(event);
@Override
public Class<? extends ReIndexAllTask<User>> getReIndexAllTask() {
return ReIndexAll.class;
}
@Override
public Updater<User> open() {
return new UserIndexUpdater(userManager, searchEngine.forType(User.class).getOrCreate());
public SerializableIndexTask<User> createStoreTask(User user) {
return index -> store(index, user);
}
public static class UserIndexUpdater implements Updater<User> {
@Override
public SerializableIndexTask<User> createDeleteTask(User item) {
return index -> index.delete().byId(Id.of(item));
}
@Subscribe(async = false)
public void handleEvent(UserEvent event) {
new HandlerEventIndexSyncer<>(searchEngine, this).handleEvent(event);
}
private static void store(Index<User> index, User user) {
index.store(Id.of(user), UserPermissions.read(user).asShiroString(), user);
}
public static class ReIndexAll extends ReIndexAllTask<User> {
private final UserManager userManager;
private final Index<User> index;
private UserIndexUpdater(UserManager userManager, Index<User> index) {
@Inject
public ReIndexAll(IndexLogStore logStore, UserManager userManager) {
super(logStore, User.class, VERSION);
this.userManager = userManager;
this.index = index;
}
@Override
public void store(User user) {
index.store(Id.of(user), UserPermissions.read(user).asShiroString(), user);
}
@Override
public void delete(User user) {
index.delete().byType().byId(Id.of(user));
}
@Override
public void reIndexAll() {
index.delete().byType().all();
public void update(Index<User> index) {
index.delete().all();
for (User user : userManager.getAll()) {
store(user);
store(index, user);
}
}
@Override
public void close() {
index.close();
}
}
}

View File

@@ -21,10 +21,12 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.web.security;
final class AdministrationContextMarker {
import java.io.Serializable;
final class AdministrationContextMarker implements Serializable {
static final AdministrationContextMarker MARKER = new AdministrationContextMarker();

View File

@@ -24,223 +24,70 @@
package sonia.scm.web.security;
//~--- non-JDK imports --------------------------------------------------------
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Singleton;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.shiro.subject.SimplePrincipalCollection;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.subject.support.SubjectThreadState;
import org.apache.shiro.util.ThreadContext;
import org.apache.shiro.util.ThreadState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sonia.scm.SCMContext;
import sonia.scm.security.Authentications;
import sonia.scm.security.Role;
import sonia.scm.security.Impersonator;
import sonia.scm.user.User;
import sonia.scm.util.AssertUtil;
//~--- JDK imports ------------------------------------------------------------
/**
*
* @author Sebastian Sdorra
*/
@Singleton
public class DefaultAdministrationContext implements AdministrationContext
{
public class DefaultAdministrationContext implements AdministrationContext {
/** Field description */
private static final User SYSTEM_ACCOUNT = new User(
Authentications.PRINCIPAL_SYSTEM,
"SCM-Manager System Account",
null
);
/** Field description */
static final String REALM = "AdminRealm";
/** the logger for DefaultAdministrationContext */
private static final Logger logger =
LoggerFactory.getLogger(DefaultAdministrationContext.class);
private static final Logger LOG = LoggerFactory.getLogger(DefaultAdministrationContext.class);
//~--- constructors ---------------------------------------------------------
private final Injector injector;
private final Impersonator impersonator;
private final PrincipalCollection adminPrincipal;
/**
* Constructs ...
*
*
* @param injector
* @param securityManager
*/
@Inject
public DefaultAdministrationContext(Injector injector,
org.apache.shiro.mgt.SecurityManager securityManager)
{
public DefaultAdministrationContext(Injector injector, Impersonator impersonator) {
this.injector = injector;
this.securityManager = securityManager;
principalCollection = createAdminCollection(SYSTEM_ACCOUNT);
this.impersonator = impersonator;
this.adminPrincipal = createAdminPrincipal();
}
//~--- methods --------------------------------------------------------------
/**
* Method description
*
*
* @param action
*/
@Override
public void runAsAdmin(PrivilegedAction action)
{
AssertUtil.assertIsNotNull(action);
if (ThreadContext.getSecurityManager() != null)
{
doRunAsInWebSessionContext(action);
}
else
{
doRunAsInNonWebSessionContext(action);
}
}
/**
* Method description
*
*
* @param actionClass
*/
@Override
public void runAsAdmin(Class<? extends PrivilegedAction> actionClass)
{
PrivilegedAction action = injector.getInstance(actionClass);
runAsAdmin(action);
}
/**
* Method description
*
*
* @param adminUser
*
* @return
*/
private PrincipalCollection createAdminCollection(User adminUser)
{
public static PrincipalCollection createAdminPrincipal() {
SimplePrincipalCollection collection = new SimplePrincipalCollection();
collection.add(adminUser.getId(), REALM);
collection.add(adminUser, REALM);
collection.add(SYSTEM_ACCOUNT.getId(), REALM);
collection.add(SYSTEM_ACCOUNT, REALM);
collection.add(AdministrationContextMarker.MARKER, REALM);
return collection;
}
/**
* Method description
*
*
* @return
*/
private Subject createAdminSubject()
{
//J-
return new Subject.Builder(securityManager)
.authenticated(true)
.principals(principalCollection)
.buildSubject();
//J+
}
private void doRunAsInNonWebSessionContext(PrivilegedAction action) {
logger.trace("bind shiro security manager to current thread");
try {
SecurityUtils.setSecurityManager(securityManager);
Subject subject = createAdminSubject();
ThreadState state = new SubjectThreadState(subject);
state.bind();
try
{
logger.debug("execute action {} in administration context", action.getClass().getName());
action.run();
} finally {
logger.trace("restore current thread state");
state.restore();
}
} finally {
SecurityUtils.setSecurityManager(null);
}
}
/**
* Method description
*
*
* @param action
*/
private void doRunAsInWebSessionContext(PrivilegedAction action)
{
Subject subject = SecurityUtils.getSubject();
String principal = (String) subject.getPrincipal();
if (logger.isInfoEnabled())
{
String username;
if (subject.hasRole(Role.USER))
{
username = principal;
}
else
{
username = SCMContext.USER_ANONYMOUS;
}
logger.debug("user {} executes {} as admin", username, action.getClass().getName());
}
Subject adminSubject = createAdminSubject();
// do not use runas, because we want only execute this action in this
// thread as administrator. Runas could affect other threads
ThreadContext.bind(adminSubject);
try
{
@Override
public void runAsAdmin(PrivilegedAction action) {
AssertUtil.assertIsNotNull(action);
LOG.debug("execute action {} in administration context", action.getClass().getName());
try (Impersonator.Session session = impersonator.impersonate(adminPrincipal)) {
action.run();
}
finally
{
logger.debug("release administration context for user {}/{}", principal,
subject.getPrincipal());
ThreadContext.bind(subject);
}
}
//~--- fields ---------------------------------------------------------------
@Override
public void runAsAdmin(Class<? extends PrivilegedAction> actionClass) {
PrivilegedAction action = injector.getInstance(actionClass);
runAsAdmin(action);
}
/** Field description */
private final Injector injector;
/** Field description */
private final org.apache.shiro.mgt.SecurityManager securityManager;
/** Field description */
private PrincipalCollection principalCollection;
}

View File

@@ -0,0 +1,213 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.work;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.subject.PrincipalCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sonia.scm.metrics.Metrics;
import sonia.scm.web.security.DefaultAdministrationContext;
import javax.annotation.Nullable;
import javax.inject.Singleton;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntSupplier;
@Singleton
public class DefaultCentralWorkQueue implements CentralWorkQueue, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(DefaultCentralWorkQueue.class);
private final List<UnitOfWork> queue = new ArrayList<>();
private final List<Resource> lockedResources = new ArrayList<>();
private final AtomicInteger size = new AtomicInteger();
private final AtomicLong order = new AtomicLong();
private final Injector injector;
private final Persistence persistence;
private final ExecutorService executor;
private final MeterRegistry meterRegistry;
@Inject
public DefaultCentralWorkQueue(Injector injector, Persistence persistence, MeterRegistry meterRegistry) {
this(injector, persistence, meterRegistry, new ThreadCountProvider());
}
@VisibleForTesting
DefaultCentralWorkQueue(Injector injector, Persistence persistence, MeterRegistry meterRegistry, IntSupplier threadCountProvider) {
this.injector = injector;
this.persistence = persistence;
this.executor = createExecutorService(meterRegistry, threadCountProvider.getAsInt());
this.meterRegistry = meterRegistry;
loadFromDisk();
}
private static ExecutorService createExecutorService(MeterRegistry registry, int threadCount) {
ExecutorService executorService = Executors.newFixedThreadPool(
threadCount,
new ThreadFactoryBuilder()
.setNameFormat("CentralWorkQueue-%d")
.build()
);
Metrics.executor(registry, executorService, "CentralWorkQueue", "fixed");
return executorService;
}
@Override
public Enqueue append() {
return new DefaultEnqueue();
}
@Override
public int getSize() {
return size.get();
}
@Override
public void close() {
executor.shutdown();
}
private void loadFromDisk() {
for (UnitOfWork unitOfWork : persistence.loadAll()) {
unitOfWork.restore(order.incrementAndGet());
append(unitOfWork);
}
run();
}
private synchronized void append(UnitOfWork unitOfWork) {
persistence.store(unitOfWork);
int queueSize = size.incrementAndGet();
queue.add(unitOfWork);
LOG.debug("add task {} to queue, queue size is now {}", unitOfWork, queueSize);
}
private synchronized void run() {
Iterator<UnitOfWork> iterator = queue.iterator();
while (iterator.hasNext()) {
UnitOfWork unitOfWork = iterator.next();
if (isRunnable(unitOfWork)) {
run(unitOfWork);
iterator.remove();
} else {
unitOfWork.blocked();
}
}
}
private void run(UnitOfWork unitOfWork) {
lockedResources.addAll(unitOfWork.getLocks());
unitOfWork.init(injector, this::finalizeWork, meterRegistry);
LOG.trace("pass task {} to executor", unitOfWork);
executor.execute(unitOfWork);
}
private synchronized void finalizeWork(UnitOfWork unitOfWork) {
for (Resource lock : unitOfWork.getLocks()) {
lockedResources.remove(lock);
}
persistence.remove(unitOfWork);
int queueSize = size.decrementAndGet();
LOG.debug("finish task, queue size is now {}", queueSize);
run();
}
private boolean isRunnable(UnitOfWork unitOfWork) {
for (Resource resource : unitOfWork.getLocks()) {
for (Resource lock : lockedResources) {
if (resource.isBlockedBy(lock)) {
LOG.trace("skip {}, because resource {} is locked by {}", unitOfWork, resource, lock);
return false;
}
}
}
return true;
}
private class DefaultEnqueue implements Enqueue {
private final Set<Resource> locks = new HashSet<>();
private boolean runAsAdmin = false;
@Override
public Enqueue locks(String resourceType) {
locks.add(new Resource(resourceType));
return this;
}
@Override
public Enqueue locks(String resource, @Nullable String id) {
locks.add(new Resource(resource, id));
return this;
}
@Override
public Enqueue runAsAdmin() {
this.runAsAdmin = true;
return this;
}
@Override
public void enqueue(Task task) {
appendAndRun(new SimpleUnitOfWork(order.incrementAndGet(), principal(), locks, task));
}
@Override
public void enqueue(Class<? extends Runnable> task) {
appendAndRun(new InjectingUnitOfWork(order.incrementAndGet(), principal(), locks, task));
}
private PrincipalCollection principal() {
if (runAsAdmin) {
return DefaultAdministrationContext.createAdminPrincipal();
}
return SecurityUtils.getSubject().getPrincipals();
}
private synchronized void appendAndRun(UnitOfWork unitOfWork) {
append(unitOfWork);
run();
}
}
}

View File

@@ -22,11 +22,11 @@
* SOFTWARE.
*/
package sonia.scm.search;
package sonia.scm.work;
@FunctionalInterface
public interface IndexQueueTask<T> {
interface Finalizer {
void updateIndex(Index<T> index);
void finalizeWork(UnitOfWork unitOfWork);
}

View File

@@ -0,0 +1,47 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.work;
import com.google.inject.Injector;
import lombok.EqualsAndHashCode;
import org.apache.shiro.subject.PrincipalCollection;
import java.util.Set;
@EqualsAndHashCode(callSuper = true)
class InjectingUnitOfWork extends UnitOfWork {
private final Class<? extends Runnable> task;
InjectingUnitOfWork(long order, PrincipalCollection principal, Set<Resource> locks, Class<? extends Runnable> task) {
super(order, principal, locks);
this.task = task;
}
@Override
protected Runnable task(Injector injector) {
return injector.getInstance(task);
}
}

View File

@@ -0,0 +1,108 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.work;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sonia.scm.plugin.PluginLoader;
import sonia.scm.store.Blob;
import sonia.scm.store.BlobStore;
import sonia.scm.store.BlobStoreFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
class Persistence {
private static final Logger LOG = LoggerFactory.getLogger(Persistence.class);
private static final String STORE_NAME = "central-work-queue";
private final ClassLoader classLoader;
private final BlobStore store;
@Inject
public Persistence(PluginLoader pluginLoader, BlobStoreFactory storeFactory) {
this(pluginLoader.getUberClassLoader(), storeFactory.withName(STORE_NAME).build());
}
@VisibleForTesting
Persistence(ClassLoader classLoader, BlobStore store) {
this.classLoader = classLoader;
this.store = store;
}
Collection<UnitOfWork> loadAll() {
List<UnitOfWork> chunks = new ArrayList<>();
for (Blob blob : store.getAll()) {
load(blob).ifPresent(chunkOfWork -> {
chunkOfWork.assignStorageId(null);
chunks.add(chunkOfWork);
});
store.remove(blob);
}
Collections.sort(chunks);
return chunks;
}
private Optional<UnitOfWork> load(Blob blob) {
try (ObjectInputStream stream = new ClassLoaderObjectInputStream(classLoader, blob.getInputStream())) {
Object o = stream.readObject();
if (o instanceof UnitOfWork) {
return Optional.of((UnitOfWork) o);
} else {
LOG.error("loaded object is not a instance of {}: {}", UnitOfWork.class, o);
}
} catch (IOException | ClassNotFoundException ex) {
LOG.error("failed to load task from store", ex);
}
return Optional.empty();
}
void store(UnitOfWork unitOfWork) {
Blob blob = store.create();
try (ObjectOutputStream outputStream = new ObjectOutputStream(blob.getOutputStream())) {
outputStream.writeObject(unitOfWork);
blob.commit();
unitOfWork.assignStorageId(blob.getId());
} catch (IOException ex) {
throw new NonPersistableTaskException("Failed to persist task", ex);
}
}
void remove(UnitOfWork unitOfWork) {
unitOfWork.getStorageId().ifPresent(store::remove);
}
}

View File

@@ -0,0 +1,66 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.work;
import lombok.EqualsAndHashCode;
import javax.annotation.Nullable;
import java.io.Serializable;
@EqualsAndHashCode
final class Resource implements Serializable {
private final String name;
@Nullable
private final String id;
Resource(String name) {
this.name = name;
this.id = null;
}
Resource(String name, @Nullable String id) {
this.name = name;
this.id = id;
}
boolean isBlockedBy(Resource resource) {
if (name.equals(resource.name)) {
if (id != null && resource.id != null) {
return id.equals(resource.id);
}
return true;
}
return false;
}
@Override
public String toString() {
if (id != null) {
return name + ":" + id;
}
return name;
}
}

View File

@@ -0,0 +1,48 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.work;
import com.google.inject.Injector;
import lombok.EqualsAndHashCode;
import org.apache.shiro.subject.PrincipalCollection;
import java.util.Set;
@EqualsAndHashCode(callSuper = true)
class SimpleUnitOfWork extends UnitOfWork {
private final Task task;
SimpleUnitOfWork(long order, PrincipalCollection principal, Set<Resource> locks, Task task) {
super(order, principal, locks);
this.task = task;
}
@Override
protected Task task(Injector injector) {
injector.injectMembers(task);
return task;
}
}

View File

@@ -0,0 +1,79 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.work;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.IntSupplier;
public class ThreadCountProvider implements IntSupplier {
private static final Logger LOG = LoggerFactory.getLogger(ThreadCountProvider.class);
@VisibleForTesting
static final String PROPERTY = "scm.central-work-queue.workers";
private final IntSupplier cpuCountProvider;
public ThreadCountProvider() {
this(() -> Runtime.getRuntime().availableProcessors());
}
@VisibleForTesting
ThreadCountProvider(IntSupplier cpuCountProvider) {
this.cpuCountProvider = cpuCountProvider;
}
@Override
public int getAsInt() {
Integer systemProperty = Integer.getInteger(PROPERTY);
if (systemProperty == null) {
LOG.debug("derive worker count from cpu count");
return deriveFromCPUCount();
}
if (isInvalid(systemProperty)) {
LOG.warn(
"system property {} contains a invalid value {}, fall back and derive worker count from cpu count",
PROPERTY, systemProperty
);
return deriveFromCPUCount();
}
return systemProperty;
}
private boolean isInvalid(int value) {
return value <= 0 || value > 64;
}
private int deriveFromCPUCount() {
int cpus = cpuCountProvider.getAsInt();
if (cpus > 1) {
return 4;
}
return 2;
}
}

View File

@@ -0,0 +1,149 @@
/*
* MIT License
*
* Copyright (c) 2020-present Cloudogu GmbH and Contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package sonia.scm.work;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.inject.Injector;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.EqualsAndHashCode;
import org.apache.shiro.subject.PrincipalCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sonia.scm.security.Impersonator;
import sonia.scm.security.Impersonator.Session;
import java.io.Serializable;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@EqualsAndHashCode
abstract class UnitOfWork implements Runnable, Serializable, Comparable<UnitOfWork> {
@VisibleForTesting
static final String METRIC_EXECUTION = "cwq.task.execution.duration";
@VisibleForTesting
static final String METRIC_WAIT = "cwq.task.wait.duration";
private static final Logger LOG = LoggerFactory.getLogger(UnitOfWork.class);
private long order;
private int blockCount = 0;
private int restoreCount = 0;
private final Set<Resource> locks;
private final PrincipalCollection principal;
private transient Finalizer finalizer;
private transient Runnable task;
private transient MeterRegistry meterRegistry;
private transient Impersonator impersonator;
private transient long createdAt;
private transient String storageId;
protected UnitOfWork(long order, PrincipalCollection principal, Set<Resource> locks) {
this.order = order;
this.principal = principal;
this.locks = locks;
this.createdAt = System.nanoTime();
}
public long getOrder() {
return order;
}
public void restore(long newOrderId) {
this.order = newOrderId;
this.createdAt = System.nanoTime();
this.restoreCount++;
}
public int getRestoreCount() {
return restoreCount;
}
public void blocked() {
blockCount++;
}
public void assignStorageId(String storageId) {
this.storageId = storageId;
}
public Optional<String> getStorageId() {
return Optional.ofNullable(storageId);
}
public Set<Resource> getLocks() {
return locks;
}
void init(Injector injector, Finalizer finalizer, MeterRegistry meterRegistry) {
this.task = task(injector);
this.finalizer = finalizer;
this.meterRegistry = meterRegistry;
this.impersonator = injector.getInstance(Impersonator.class);
}
protected abstract Runnable task(Injector injector);
@Override
public void run() {
Stopwatch sw = Stopwatch.createStarted();
Timer.Sample sample = Timer.start(meterRegistry);
try (Session session = impersonator.impersonate(principal)) {
task.run();
LOG.debug("task {} finished successful after {}", task, sw.stop());
} catch (Exception ex) {
LOG.error("task {} failed after {}", task, sw.stop(), ex);
} finally {
sample.stop(createExecutionTimer());
createWaitTimer().record(System.nanoTime() - createdAt, TimeUnit.NANOSECONDS);
finalizer.finalizeWork(this);
}
}
private Timer createExecutionTimer() {
return Timer.builder(METRIC_EXECUTION)
.description("Central work queue task execution duration")
.tags("task", task.getClass().getName())
.register(meterRegistry);
}
private Timer createWaitTimer() {
return Timer.builder(METRIC_WAIT)
.description("Central work queue task wait duration")
.tags("task", task.getClass().getName(), "restores", String.valueOf(restoreCount), "blocked", String.valueOf(blockCount))
.register(meterRegistry);
}
@Override
public int compareTo(UnitOfWork o) {
return Long.compare(order, o.order);
}
}