Messaging Service Guide
What you'll build
You will build a messaging service that enables persisting and fetching messages
What you’ll need
An Integrated Developer Environment (IDE)
Popular choices include IntelliJ IDEA, Spring Tools, Visual Studio Code, or Eclipse, and many more.
A Java™ Development Kit (JDK) installed. We recommend AdoptOpenJDK version 11 or version 14.
Running PostgreSQL database with ‘FlexiCore’ database acceesible by user ‘FlexiCore’ identified by password ‘FlexiCore’
Running MongoDB database
FlexiCore executable at /home/FlexiCore or any other location , FlexiCore’s latest executable can be obtained from the release section.
Environment can be easily installed using Wizzdi Setup , latest Wizzdi Setup is available here.
at this phase we will create a service allowing users to save and fetch messages.
managing entities for this phase:
Create a new Maven Project with the following structure:
update your pom.xml from here
Open up the project in your IDE and create the Message.java
file in the src/main/java/com/wizzdi/messaging/model
folder. Now change the contents of the file by adding the extra method and annotations shown in the code below. You can copy and paste the code or just type it.
package com.wizzdi.messaging.model;
import com.flexicore.model.Baseclass;
import com.flexicore.model.User;
import com.flexicore.security.SecurityContext;
import javax.persistence.Entity;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
@Entity
public class Message extends Baseclass {
private String subject;
@Lob
private String content;
@ManyToOne(targetEntity = User.class)
private User fromUser;
@ManyToOne(targetEntity = User.class)
private User toUser;
public Message() {
}
public Message(String name, SecurityContext securityContext) {
super(name, securityContext);
}
public String getSubject() {
return subject;
}
public <T extends Message> T setSubject(String subject) {
this.subject = subject;
return (T) this;
}
@Lob
public String getContent() {
return content;
}
public <T extends Message> T setContent(String content) {
this.content = content;
return (T) this;
}
@ManyToOne(targetEntity = User.class)
public User getFromUser() {
return fromUser;
}
public <T extends Message> T setFromUser(User fromUser) {
this.fromUser = fromUser;
return (T) this;
}
@ManyToOne(targetEntity = User.class)
public User getToUser() {
return toUser;
}
public <T extends Message> T setToUser(User toUser) {
this.toUser = toUser;
return (T) this;
}
}
Baseclass
since we want to be able to govern permissions for it@Lob
annotation since we want the underlaying database to create this field in a format suitable for long strings (rather than limited space varchar)copy persistence.xml content from here.
this will allow automatic generation of JPA metamodels required to implement Criteria API based queries.
./mvn install
./cp target/messaging-model-1.0.0-jar /home/FlexiCore/entities
create a maven project with the following structure:
update your pom.xml from here
lets define objects that will be consumed by our api:
MessageCreate
– this object will contain all required details for creating a message ( from user , to user , content and subject) it will also inherit from BaseclassCreate
object as we would like to extend Baseclass
capabilitiesMessageUpdate
– this object will extend MessageCreate
object and id of the message to updateMessageFilter
– this object will be sent by the client when fetching messages and will contain filtering options on messagespackage com.wizzdi.messaging.request;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.flexicore.model.User;
import com.flexicore.request.BaseclassCreate;
public class MessageCreate extends BaseclassCreate {
private String subject;
private String content;
private String fromUserId;
@JsonIgnore
private User fromUser;
private String toUserId;
@JsonIgnore
private User toUser;
public String getSubject() {
return subject;
}
public <T extends MessageCreate> T setSubject(String subject) {
this.subject = subject;
return (T) this;
}
public String getContent() {
return content;
}
public <T extends MessageCreate> T setContent(String content) {
this.content = content;
return (T) this;
}
public String getFromUserId() {
return fromUserId;
}
public <T extends MessageCreate> T setFromUserId(String fromUserId) {
this.fromUserId = fromUserId;
return (T) this;
}
@JsonIgnore
public User getFromUser() {
return fromUser;
}
public <T extends MessageCreate> T setFromUser(User fromUser) {
this.fromUser = fromUser;
return (T) this;
}
public String getToUserId() {
return toUserId;
}
public <T extends MessageCreate> T setToUserId(String toUserId) {
this.toUserId = toUserId;
return (T) this;
}
@JsonIgnore
public User getToUser() {
return toUser;
}
public <T extends MessageCreate> T setToUser(User toUser) {
this.toUser = toUser;
return (T) this;
}
}
package com.wizzdi.messaging.request;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.flexicore.model.FilteringInformationHolder;
import com.flexicore.model.User;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class MessageFilter extends FilteringInformationHolder {
private String contentLike;
private String subjectLike;
private Set<String> fromUsersIds=new HashSet<>();
@JsonIgnore
private List<User> fromUsers;
private Set<String> toUsersIds=new HashSet<>();
@JsonIgnore
private List<User> toUsers;
public String getContentLike() {
return contentLike;
}
public <T extends MessageFilter> T setContentLike(String contentLike) {
this.contentLike = contentLike;
return (T) this;
}
public String getSubjectLike() {
return subjectLike;
}
public <T extends MessageFilter> T setSubjectLike(String subjectLike) {
this.subjectLike = subjectLike;
return (T) this;
}
public Set<String> getFromUsersIds() {
return fromUsersIds;
}
public <T extends MessageFilter> T setFromUsersIds(Set<String> fromUsersIds) {
this.fromUsersIds = fromUsersIds;
return (T) this;
}
@JsonIgnore
public List<User> getFromUsers() {
return fromUsers;
}
public <T extends MessageFilter> T setFromUsers(List<User> fromUsers) {
this.fromUsers = fromUsers;
return (T) this;
}
public Set<String> getToUsersIds() {
return toUsersIds;
}
public <T extends MessageFilter> T setToUsersIds(Set<String> toUsersIds) {
this.toUsersIds = toUsersIds;
return (T) this;
}
@JsonIgnore
public List<User> getToUsers() {
return toUsers;
}
public <T extends MessageFilter> T setToUsers(List<User> toUsers) {
this.toUsers = toUsers;
return (T) this;
}
}
package com.wizzdi.messaging.request;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.wizzdi.messaging.model.Message;
public class MessageUpdate extends MessageCreate{
private String id;
@JsonIgnore
private Message message;
public String getId() {
return id;
}
public <T extends MessageUpdate> T setId(String id) {
this.id = id;
return (T) this;
}
@JsonIgnore
public Message getMessage() {
return message;
}
public <T extends MessageUpdate> T setMessage(Message message) {
this.message = message;
return (T) this;
}
}
lets define the repository that will be used to fetch and save messages from database.
package com.wizzdi.messaging.data;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.interfaces.AbstractRepositoryPlugin;
import com.flexicore.model.QueryInformationHolder;
import com.flexicore.model.User;
import com.flexicore.model.User_;
import com.flexicore.security.SecurityContext;
import com.wizzdi.messaging.model.Message;
import com.wizzdi.messaging.model.Message_;
import com.wizzdi.messaging.request.MessageFilter;
import org.pf4j.Extension;
import org.springframework.stereotype.Component;
import javax.persistence.criteria.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@Extension
@PluginInfo(version = 1)
@Component
public class MessageRepository extends AbstractRepositoryPlugin {
public List<Message> listAllMessages(
MessageFilter filtering, SecurityContext securityContext) {
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<Message> q = cb.createQuery(Message.class);
Root<Message> r = q.from(Message.class);
List<Predicate> preds = new ArrayList<>();
addMessagePredicates(preds, r, cb, filtering);
QueryInformationHolder<Message> queryInformationHolder = new QueryInformationHolder<>(filtering, Message.class, securityContext);
return getAllFiltered(queryInformationHolder, preds, cb, q, r);
}
public long countAllMessages(MessageFilter filtering, SecurityContext securityContext) {
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<Long> q = cb.createQuery(Long.class);
Root<Message> r = q.from(Message.class);
List<Predicate> preds = new ArrayList<>();
addMessagePredicates(preds, r, cb, filtering);
QueryInformationHolder<Message> queryInformationHolder = new QueryInformationHolder<>(filtering, Message.class, securityContext);
return countAllFiltered(queryInformationHolder, preds, cb, q, r);
}
private void addMessagePredicates(List<Predicate> preds, Root<Message> r, CriteriaBuilder cb, MessageFilter filtering) {
if(filtering.getFromUsers()!=null &&!filtering.getFromUsers().isEmpty()){
Set<String> fromIds=filtering.getFromUsers().stream().map(f->f.getId()).collect(Collectors.toSet());
Join<Message, User> join=r.join(Message_.fromUser);
preds.add(join.get(User_.id).in(fromIds));
}
if(filtering.getToUsers()!=null &&!filtering.getToUsers().isEmpty()){
Set<String> fromIds=filtering.getToUsers().stream().map(f->f.getId()).collect(Collectors.toSet());
Join<Message, User> join=r.join(Message_.toUser);
preds.add(join.get(User_.id).in(fromIds));
}
if(filtering.getContentLike()!=null){
preds.add(cb.like(r.get(Message_.content),filtering.getContentLike()));
}
if(filtering.getSubjectLike()!=null){
preds.add(cb.like(r.get(Message_.subject),filtering.getSubjectLike()));
}
}
}
@Extension
annotation to allow FlexiCore to load it as a plugin @PluginInfo
annotation to allow future versioning support@Component
annotation to let spring know it is a bean.AbstractRepositoryPlugin
which provides easy method for access control and out of the box methods for persisting objectsaddMessagePredicates
which adds the required predicates , all access control predicates are automatically added when countAllFiltered
and getAllFiltered
are calledaddMessagePredicates
uses JPA Criteria Api to filter data based on MessageFilter
object we have created in previous phase.lets define the service that will be used by other plugins and REST api (or any other API implementations for that matter)
package com.wizzdi.messaging.service;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.data.jsoncontainers.PaginationResponse;
import com.flexicore.interfaces.ServicePlugin;
import com.flexicore.model.Baseclass;
import com.flexicore.model.User;
import com.flexicore.security.SecurityContext;
import com.flexicore.service.BaseclassNewService;
import com.wizzdi.messaging.data.MessageRepository;
import com.wizzdi.messaging.model.Message;
import com.wizzdi.messaging.request.MessageCreate;
import com.wizzdi.messaging.request.MessageFilter;
import com.wizzdi.messaging.request.MessageUpdate;
import org.pf4j.Extension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import javax.ws.rs.BadRequestException;
import java.util.*;
import java.util.stream.Collectors;
@Extension
@PluginInfo(version = 1)
@Component
public class MessageService implements ServicePlugin {
@Autowired
private BaseclassNewService baseclassNewService;
@Autowired
@PluginInfo(version = 1)
private MessageRepository messageRepository;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public Message createMessage(MessageCreate messageCreate, SecurityContext securityContext) {
Message message=createMessageNoMerge(messageCreate,securityContext);
messageRepository.merge(message);
applicationEventPublisher.publishEvent(message);
return message;
}
public boolean updateMessageNoMerge(MessageCreate messageCreate,Message message){
boolean update=baseclassNewService.updateBaseclassNoMerge(messageCreate,message);
if(messageCreate.getContent()!=null&&!messageCreate.getContent().equals(message.getContent())){
message.setContent(messageCreate.getContent());
update=true;
}
if(messageCreate.getSubject()!=null&&!messageCreate.getSubject().equals(message.getSubject())){
message.setSubject(messageCreate.getSubject());
update=true;
}
if(messageCreate.getFromUser()!=null&&(message.getFromUser()==null||!messageCreate.getFromUser().getId().equals(message.getFromUser().getId()))){
message.setFromUser(messageCreate.getFromUser());
update=true;
}
if(messageCreate.getToUser()!=null&&(message.getToUser()==null||!messageCreate.getToUser().getId().equals(message.getToUser().getId()))){
message.setToUser(messageCreate.getToUser());
update=true;
}
return update;
}
private Message createMessageNoMerge(MessageCreate messageCreate, SecurityContext securityContext) {
Message message=new Message(messageCreate.getName(),securityContext);
updateMessageNoMerge(messageCreate,message);
return message;
}
public void validateCreate(MessageCreate messageCreate,SecurityContext securityContext){
validate(messageCreate,securityContext);
if(messageCreate.getToUser()==null){
throw new BadRequestException("No User with id "+messageCreate.getToUserId());
}
}
public void validate(MessageCreate messageCreate, SecurityContext securityContext) {
String fromUserId=messageCreate.getFromUserId();
User fromUser=fromUserId!=null?messageRepository.getByIdOrNull(fromUserId,User.class,null,securityContext):securityContext.getUser();
messageCreate.setFromUser(fromUser);
String toUserId=messageCreate.getToUserId();
User toUser=toUserId!=null?messageRepository.getByIdOrNull(toUserId,User.class,null,securityContext):null;
if(toUserId!=null&&toUser==null){
throw new BadRequestException("No User with id "+toUserId);
}
messageCreate.setToUser(toUser);
}
public void validate(MessageFilter filtering, SecurityContext securityContext) {
baseclassNewService.validateFilter(filtering,securityContext);
Set<String> fromUserIds=filtering.getFromUsersIds();
Map<String, User> fromUsers=fromUserIds.isEmpty()?new HashMap<>():messageRepository.listByIds(User.class,fromUserIds,securityContext).stream().collect(Collectors.toMap(f->f.getId(),f->f));
fromUserIds.removeAll(fromUsers.keySet());
if(!fromUserIds.isEmpty()){
throw new BadRequestException("No Users with ids "+fromUserIds);
}
filtering.setFromUsers(new ArrayList<>(fromUsers.values()));
Set<String> toUsersIds=filtering.getToUsersIds();
Map<String, User> toUsers=toUsersIds.isEmpty()?new HashMap<>():messageRepository.listByIds(User.class,toUsersIds,securityContext).stream().collect(Collectors.toMap(f->f.getId(),f->f));
toUsersIds.removeAll(toUsers.keySet());
if(!toUsersIds.isEmpty()){
throw new BadRequestException("No Users with ids "+toUsersIds);
}
filtering.setToUsers(new ArrayList<>(toUsers.values()));
}
public PaginationResponse<Message> getAllMessages(MessageFilter filtering, SecurityContext securityContext) {
List<Message> list=listAllMessages(filtering,securityContext);
long count=messageRepository.countAllMessages(filtering,securityContext);
return new PaginationResponse<>(list,filtering,count);
}
private List<Message> listAllMessages(MessageFilter filtering, SecurityContext securityContext) {
return messageRepository.listAllMessages(filtering,securityContext);
}
public Message updateMessage(MessageUpdate messageUpdate, SecurityContext securityContext) {
Message message=messageUpdate.getMessage();
if(updateMessageNoMerge(messageUpdate,message)){
messageRepository.merge(message);
}
return message;
}
public <T extends Baseclass> T getByIdOrNull(String id, Class<T> c, List<String> batchString, SecurityContext securityContext) {
return messageRepository.getByIdOrNull(id, c, batchString, securityContext);
}
}
lets define the REST service that will expose REST api that our clients will use:
package com.wizzdi.messaging.rest;
import com.flexicore.annotations.OperationsInside;
import com.flexicore.annotations.ProtectedREST;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.data.jsoncontainers.PaginationResponse;
import com.flexicore.interfaces.RestServicePlugin;
import com.flexicore.security.SecurityContext;
import com.wizzdi.messaging.model.Message;
import com.wizzdi.messaging.request.MessageCreate;
import com.wizzdi.messaging.request.MessageFilter;
import com.wizzdi.messaging.request.MessageUpdate;
import com.wizzdi.messaging.service.MessageService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.pf4j.Extension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import java.util.stream.Collectors;
@PluginInfo(version = 1)
@OperationsInside
@ProtectedREST
@Path("plugins/message")
@Tag(name = "message")
@Extension
@Component
public class MessageRESTService implements RestServicePlugin {
@PluginInfo(version = 1)
@Autowired
private MessageService service;
@POST
@Produces("application/json")
@Operation(summary = "getAllMessages", description = "Gets All Messages Filtered")
@Path("getAllMessages")
public PaginationResponse<Message> getAllMessages(
@HeaderParam("authenticationKey") String authenticationKey,
MessageFilter filtering,
@Context SecurityContext securityContext) {
service.validate(filtering, securityContext);
return service.getAllMessages(filtering, securityContext);
}
@POST
@Produces("application/json")
@Operation(summary = "createMessage", description = "creates Message")
@Path("createMessage")
public Message createMessage(
@HeaderParam("authenticationKey") String authenticationKey,
MessageCreate messageCreate,
@Context SecurityContext securityContext) {
service.validateCreate(messageCreate, securityContext);
return service.createMessage(messageCreate, securityContext);
}
@PUT
@Produces("application/json")
@Operation(summary = "updateMessage", description = "Updates Message")
@Path("updateMessage")
public Message updateMessage(
@HeaderParam("authenticationKey") String authenticationKey,
MessageUpdate messageUpdate,
@Context SecurityContext securityContext) {
Message message = messageUpdate.getId() != null ? service.getByIdOrNull(messageUpdate.getId(), Message.class, null, securityContext) : null;
if (message == null) {
throw new BadRequestException("No MessageUpdate with id " + messageUpdate.getId());
}
messageUpdate.setMessage(message);
service.validate(messageUpdate, securityContext);
return service.updateMessage(messageUpdate, securityContext);
}
}
currently clients will have to poll for new messages which will cause excessive CPU usage and bad experience for the users since messages will only update during the polling interval , to solve that we will add a Web Socket endpoint to push new messages to clients.
currently we will only support a single new web socket message called NewMessage
but we will leave room for more message types that implement IWSMessage
interface.
package com.wizzdi.messaging.websocket.messages;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
import com.flexicore.data.jsoncontainers.CrossLoaderResolver;
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type")
@JsonTypeIdResolver(CrossLoaderResolver.class)
public interface IWSMessage {
Set<String> getTarget();
}
package com.wizzdi.messaging.websocket.messages;
import com.wizzdi.messaging.model.Message;
import java.util.Collections;
import java.util.Set;
public class NewMessage implements IWSMessage{
private Message newMessage;
public Message getNewMessage() {
return newMessage;
}
public <T extends NewMessage> T setNewMessage(Message newMessage) {
this.newMessage = newMessage;
return (T) this;
}
@Override
public Set<String> getTarget() {
return Collections.singleton(newMessage.getToUser().getId());
}
}
Registration of the new type is done by a component listening on PluginsLoadedEvent
– an event sent after plugins beans have been registered.
package com.wizzdi.messaging.config;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.data.jsoncontainers.CrossLoaderResolver;
import com.flexicore.events.PluginsLoadedEvent;
import com.flexicore.interfaces.ServicePlugin;
import com.wizzdi.messaging.websocket.messages.NewMessage;
import org.pf4j.Extension;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@PluginInfo(version = 1)
@Component
@Extension
public class Config implements ServicePlugin {
@EventListener
public void init(PluginsLoadedEvent pluginsLoadedEvent){
CrossLoaderResolver.registerClass(NewMessage.class);
}
}
package com.wizzdi.messaging.websocket.encoders;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.flexicore.data.jsoncontainers.Views;
import com.wizzdi.messaging.websocket.messages.IWSMessage;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
import java.io.IOException;
import java.io.Writer;
public class MessageMessageEncoder implements Encoder.TextStream<IWSMessage> {
private ObjectMapper objectMapper;
@Override
public void init(EndpointConfig config) {
this.objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
}
@Override
public void destroy() {
}
@Override
public void encode(IWSMessage object, Writer writer) throws IOException {
objectMapper.writeValue(writer, object);
}
}
we are using Jackson’s ObjectMapper
to serialize the messages.
package com.wizzdi.messaging.websocket;
import com.flexicore.annotations.Protected;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.interfaces.WebSocketPlugin;
import com.flexicore.model.User;
import com.flexicore.security.SecurityContext;
import com.wizzdi.messaging.websocket.encoders.MessageMessageEncoder;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@ServerEndpoint(value = "/FlexiCore/messageWS/{authenticationKey}", encoders = {MessageMessageEncoder.class})
@PluginInfo(version = 1)
@Protected
@Extension
@Component
public class MessageWebSocket implements WebSocketPlugin {
private static final Logger logger= LoggerFactory.getLogger(MessageWebSocket.class);
private final static Map<String, Session> sessionMap = new ConcurrentHashMap<>();
private static final String SECURITY_CONTEXT_KEY = "securityContext";
@OnMessage
public void receiveMessage(String message, Session session) {
logger.info("Received : " + message + ", session:" + session.getId());
}
@OnOpen
public void open(@PathParam("authenticationKey") String authenticationKey, Session session) {
String id = session.getId();
logger.info("Open session:" + id);
sessionMap.put(id, session);
}
@OnClose
public void close(@PathParam("authenticationKey") String authenticationKey,Session session, CloseReason c) {
logger.info("Closing:" + session.getId());
sessionMap.remove(session.getId());
}
public static List<Session> getSessionsForUsers(Set<String> targetUserIds) {
return sessionMap.values().stream()
.filter(f->f.getUserProperties()!=null&&
f.getUserProperties().containsKey(SECURITY_CONTEXT_KEY)&&
targetUserIds.contains(((SecurityContext)f.getUserProperties().get(SECURITY_CONTEXT_KEY)).getUser().getId()))
.collect(Collectors.toList());
}
public static void removeAll(List<Session> sessions){
for (Session session : sessions) {
sessionMap.remove(session.getId(),session);
}
}
}
the @ServerEndpoint
annotation registers this class as a websocket endpoint and registers the MessageMessageEncoder
class as the encoder for outgoing messages , we have added a path parameter place holder so the authnetication key can be provided by clients.
the @Protected
annotation is being used to automatically autheticate the incoming authetication key and push the SecurityContext
into the sessions properties. once a client connects we add it to a map of running sessions.
package com.wizzdi.messaging.websocket;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.interfaces.ServicePlugin;
import com.wizzdi.messaging.websocket.messages.IWSMessage;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.websocket.EncodeException;
import javax.websocket.Session;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@PluginInfo(version = 1)
@Extension
@Component
public class MessageSender implements ServicePlugin {
private static final Logger logger=LoggerFactory.getLogger(MessageSender.class);
@Async
@EventListener
public void sendEvent(IWSMessage iwsMessage) {
List<Session> toRemove = new ArrayList<>();
List<Session> sessionsForUsers = MessageWebSocket.getSessionsForUsers(iwsMessage.getTarget());
logger.info("Received ui event " + iwsMessage + " to send to " + sessionsForUsers.size() + " sessions");
for (Session session : sessionsForUsers) {
try {
if (!session.isOpen()) {
toRemove.add(session);
continue;
}
session.getBasicRemote().sendObject(iwsMessage);
} catch (EncodeException | IOException e) {
logger.error( "unable to send message", e);
try {
session.close();
} catch (IOException e1) {
logger.error( "unable to close session");
}
toRemove.add(session);
}
}
MessageWebSocket.removeAll(toRemove);
}
}
the MessageSender
will listen to events of type IWSMessage
and will send them to relevant sessions ( this is determined by the getTargetIds
method on the message)
now we will need to send the NewMessage
event when a message is created by updating createMessage
method in MessageService
:
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public Message createMessage(MessageCreate messageCreate, SecurityContext securityContext) {
Message message=createMessageNoMerge(messageCreate,securityContext);
messageRepository.merge(message);
applicationEventPublisher.publishEvent(message);
return message;
}
Success Stories