Messaging Service Guide

What you'll build

You will build a messaging service that enables persisting and fetching messages 

What you’ll need

An Integrated Developer Environment (IDE)

Popular choices include IntelliJ IDEA, Spring Tools, Visual Studio Code, or Eclipse, and many more.

A Java™ Development Kit (JDK) installed. We recommend AdoptOpenJDK version 11 or version 14.

Running PostgreSQL database with ‘FlexiCore’ database acceesible by user ‘FlexiCore’ identified by password ‘FlexiCore’

Running MongoDB database

FlexiCore executable at /home/FlexiCore or any other location , FlexiCore’s latest executable can be obtained from the release section.

Environment can be easily installed using Wizzdi Setup , latest Wizzdi Setup is available here.

Phase 1: Basic Messaging Service

at this phase we will create a service allowing users to save and fetch messages.

Step 1.a: Design Basic Model

managing entities for this phase:

  1. Users – we can use the existing User model FlexiCore provides to model message source and target
  2. Message – we need to model a single message , this object will contain a from/to users the contant and the subject of the message

 

Create a new Maven Project with the following structure:

update your pom.xml from here

Step 1.b: Create Message Entity

Open up the project in your IDE and create the Message.java file in the src/main/java/com/wizzdi/messaging/model folder. Now change the contents of the file by adding the extra method and annotations shown in the code below. You can copy and paste the code or just type it.

package com.wizzdi.messaging.model;
import com.flexicore.model.Baseclass;
import com.flexicore.model.User;
import com.flexicore.security.SecurityContext;
import javax.persistence.Entity;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
@Entity
public class Message extends Baseclass {
    private String subject;
    @Lob
    private String content;
    @ManyToOne(targetEntity = User.class)
    private User fromUser;
    @ManyToOne(targetEntity = User.class)
    private User toUser;
    public Message() {
    }
    public Message(String name, SecurityContext securityContext) {
        super(name, securityContext);
    }
    public String getSubject() {
        return subject;
    }
    public <T extends Message> T setSubject(String subject) {
        this.subject = subject;
        return (T) this;
    }
    @Lob
    public String getContent() {
        return content;
    }
    public <T extends Message> T setContent(String content) {
        this.content = content;
        return (T) this;
    }
    @ManyToOne(targetEntity = User.class)
    public User getFromUser() {
        return fromUser;
    }
    public <T extends Message> T setFromUser(User fromUser) {
        this.fromUser = fromUser;
        return (T) this;
    }
    @ManyToOne(targetEntity = User.class)
    public User getToUser() {
        return toUser;
    }
    public <T extends Message> T setToUser(User toUser) {
        this.toUser = toUser;
        return (T) this;
    }
} 
  1. Our Message object inherits from Baseclass since we want to be able to govern permissions for it
  2. content field is annotated with @Lob annotation since we want the underlaying database to create this field in a format suitable for long strings (rather than limited space varchar)

Step 1.c: Create Persistence.xml

copy persistence.xml content from here.

this will allow automatic generation of JPA metamodels required to implement Criteria API based queries.

Step 1.d: install messaging model

./mvn install

./cp target/messaging-model-1.0.0-jar /home/FlexiCore/entities

Step 1.e: create Message service plugin

create a maven project with the following structure:

update your pom.xml from here

Step 1.f: Create Message service request objects

lets define objects that will be consumed by our api:

  1. MessageCreate – this object will contain all required details for creating a message ( from user , to user , content and subject) it will also inherit from BaseclassCreate object as we would like to extend Baseclass capabilities
  2. MessageUpdate – this object will extend MessageCreate object and id of the message to update
  3. MessageFilter – this object will be sent by the client when fetching messages and will contain filtering options on messages
package com.wizzdi.messaging.request;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.flexicore.model.User;
import com.flexicore.request.BaseclassCreate;

public class MessageCreate extends BaseclassCreate {
    private String subject;
    private String content;
    private String fromUserId;
    @JsonIgnore
    private User fromUser;
    private String toUserId;
    @JsonIgnore
    private User toUser;

    public String getSubject() {
        return subject;
    }

    public <T extends MessageCreate> T setSubject(String subject) {
        this.subject = subject;
        return (T) this;
    }

    public String getContent() {
        return content;
    }

    public <T extends MessageCreate> T setContent(String content) {
        this.content = content;
        return (T) this;
    }

    public String getFromUserId() {
        return fromUserId;
    }

    public <T extends MessageCreate> T setFromUserId(String fromUserId) {
        this.fromUserId = fromUserId;
        return (T) this;
    }

    @JsonIgnore
    public User getFromUser() {
        return fromUser;
    }

    public <T extends MessageCreate> T setFromUser(User fromUser) {
        this.fromUser = fromUser;
        return (T) this;
    }

    public String getToUserId() {
        return toUserId;
    }

    public <T extends MessageCreate> T setToUserId(String toUserId) {
        this.toUserId = toUserId;
        return (T) this;
    }

    @JsonIgnore
    public User getToUser() {
        return toUser;
    }

    public <T extends MessageCreate> T setToUser(User toUser) {
        this.toUser = toUser;
        return (T) this;
    }
} 
package com.wizzdi.messaging.request;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.flexicore.model.FilteringInformationHolder;
import com.flexicore.model.User;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class MessageFilter extends FilteringInformationHolder {

    private String contentLike;
    private String subjectLike;
    private Set<String> fromUsersIds=new HashSet<>();
    @JsonIgnore
    private List<User> fromUsers;
    private Set<String> toUsersIds=new HashSet<>();
    @JsonIgnore
    private List<User> toUsers;

    public String getContentLike() {
        return contentLike;
    }

    public <T extends MessageFilter> T setContentLike(String contentLike) {
        this.contentLike = contentLike;
        return (T) this;
    }

    public String getSubjectLike() {
        return subjectLike;
    }

    public <T extends MessageFilter> T setSubjectLike(String subjectLike) {
        this.subjectLike = subjectLike;
        return (T) this;
    }

    public Set<String> getFromUsersIds() {
        return fromUsersIds;
    }

    public <T extends MessageFilter> T setFromUsersIds(Set<String> fromUsersIds) {
        this.fromUsersIds = fromUsersIds;
        return (T) this;
    }

    @JsonIgnore
    public List<User> getFromUsers() {
        return fromUsers;
    }

    public <T extends MessageFilter> T setFromUsers(List<User> fromUsers) {
        this.fromUsers = fromUsers;
        return (T) this;
    }

    public Set<String> getToUsersIds() {
        return toUsersIds;
    }

    public <T extends MessageFilter> T setToUsersIds(Set<String> toUsersIds) {
        this.toUsersIds = toUsersIds;
        return (T) this;
    }

    @JsonIgnore
    public List<User> getToUsers() {
        return toUsers;
    }

    public <T extends MessageFilter> T setToUsers(List<User> toUsers) {
        this.toUsers = toUsers;
        return (T) this;
    }
} 
package com.wizzdi.messaging.request;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.wizzdi.messaging.model.Message;

public class MessageUpdate extends MessageCreate{
    private String id;
    @JsonIgnore
    private Message message;

    public String getId() {
        return id;
    }

    public <T extends MessageUpdate> T setId(String id) {
        this.id = id;
        return (T) this;
    }

    @JsonIgnore
    public Message getMessage() {
        return message;
    }

    public <T extends MessageUpdate> T setMessage(Message message) {
        this.message = message;
        return (T) this;
    }
} 

Step 1.g: Create Message Repository

lets define the repository that will be used to fetch and save messages from database.

package com.wizzdi.messaging.data;

import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.interfaces.AbstractRepositoryPlugin;
import com.flexicore.model.QueryInformationHolder;
import com.flexicore.model.User;
import com.flexicore.model.User_;
import com.flexicore.security.SecurityContext;
import com.wizzdi.messaging.model.Message;
import com.wizzdi.messaging.model.Message_;
import com.wizzdi.messaging.request.MessageFilter;
import org.pf4j.Extension;
import org.springframework.stereotype.Component;

import javax.persistence.criteria.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@Extension
@PluginInfo(version = 1)
@Component
public class MessageRepository extends AbstractRepositoryPlugin {
    public List<Message> listAllMessages(
            MessageFilter filtering, SecurityContext securityContext) {
        CriteriaBuilder cb = em.getCriteriaBuilder();
        CriteriaQuery<Message> q = cb.createQuery(Message.class);
        Root<Message> r = q.from(Message.class);
        List<Predicate> preds = new ArrayList<>();
        addMessagePredicates(preds, r, cb, filtering);
        QueryInformationHolder<Message> queryInformationHolder = new QueryInformationHolder<>(filtering, Message.class, securityContext);
        return getAllFiltered(queryInformationHolder, preds, cb, q, r);

    }

    public long countAllMessages(MessageFilter filtering, SecurityContext securityContext) {
        CriteriaBuilder cb = em.getCriteriaBuilder();
        CriteriaQuery<Long> q = cb.createQuery(Long.class);
        Root<Message> r = q.from(Message.class);
        List<Predicate> preds = new ArrayList<>();
        addMessagePredicates(preds, r, cb, filtering);
        QueryInformationHolder<Message> queryInformationHolder = new QueryInformationHolder<>(filtering, Message.class, securityContext);
        return countAllFiltered(queryInformationHolder, preds, cb, q, r);
    }

    private void addMessagePredicates(List<Predicate> preds, Root<Message> r, CriteriaBuilder cb, MessageFilter filtering) {
        if(filtering.getFromUsers()!=null &&!filtering.getFromUsers().isEmpty()){
            Set<String> fromIds=filtering.getFromUsers().stream().map(f->f.getId()).collect(Collectors.toSet());
            Join<Message, User> join=r.join(Message_.fromUser);
            preds.add(join.get(User_.id).in(fromIds));
        }

        if(filtering.getToUsers()!=null &&!filtering.getToUsers().isEmpty()){
            Set<String> fromIds=filtering.getToUsers().stream().map(f->f.getId()).collect(Collectors.toSet());
            Join<Message, User> join=r.join(Message_.toUser);
            preds.add(join.get(User_.id).in(fromIds));
        }
        if(filtering.getContentLike()!=null){
            preds.add(cb.like(r.get(Message_.content),filtering.getContentLike()));
        }
        if(filtering.getSubjectLike()!=null){
            preds.add(cb.like(r.get(Message_.subject),filtering.getSubjectLike()));
        }
    }
} 
  1. the repository is annotated by
    •  @Extension annotation to allow FlexiCore to load it as a plugin 
    • @PluginInfo annotation to allow future versioning support
    • @Component annotation to let spring know it is a bean.
  2. the repository class extends AbstractRepositoryPlugin which provides easy method for access control and out of the box methods for persisting objects
  3. the repository exposes methods for listing and counting messages both are calling the addMessagePredicates which adds the required predicates , all access control predicates are automatically added when countAllFiltered and getAllFiltered are called
  4. addMessagePredicates uses JPA Criteria Api to filter data based on MessageFilter object we have created in previous phase.

Step 1.h: Create Message Service

lets define the service that will be used by other plugins and REST api (or any other API implementations for that matter)

package com.wizzdi.messaging.service;

import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.data.jsoncontainers.PaginationResponse;
import com.flexicore.interfaces.ServicePlugin;
import com.flexicore.model.Baseclass;
import com.flexicore.model.User;
import com.flexicore.security.SecurityContext;
import com.flexicore.service.BaseclassNewService;
import com.wizzdi.messaging.data.MessageRepository;
import com.wizzdi.messaging.model.Message;
import com.wizzdi.messaging.request.MessageCreate;
import com.wizzdi.messaging.request.MessageFilter;
import com.wizzdi.messaging.request.MessageUpdate;
import org.pf4j.Extension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import javax.ws.rs.BadRequestException;
import java.util.*;
import java.util.stream.Collectors;

@Extension
@PluginInfo(version = 1)
@Component
public class MessageService implements ServicePlugin {

    @Autowired
    private BaseclassNewService baseclassNewService;

    @Autowired
    @PluginInfo(version = 1)
    private MessageRepository messageRepository;

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;


    public Message createMessage(MessageCreate messageCreate, SecurityContext securityContext) {
        Message message=createMessageNoMerge(messageCreate,securityContext);
        messageRepository.merge(message);
        applicationEventPublisher.publishEvent(message);
        return message;
    }

    public boolean updateMessageNoMerge(MessageCreate messageCreate,Message message){
        boolean update=baseclassNewService.updateBaseclassNoMerge(messageCreate,message);
        if(messageCreate.getContent()!=null&&!messageCreate.getContent().equals(message.getContent())){
            message.setContent(messageCreate.getContent());
            update=true;
        }
        if(messageCreate.getSubject()!=null&&!messageCreate.getSubject().equals(message.getSubject())){
            message.setSubject(messageCreate.getSubject());
            update=true;
        }
        if(messageCreate.getFromUser()!=null&&(message.getFromUser()==null||!messageCreate.getFromUser().getId().equals(message.getFromUser().getId()))){
            message.setFromUser(messageCreate.getFromUser());
            update=true;
        }

        if(messageCreate.getToUser()!=null&&(message.getToUser()==null||!messageCreate.getToUser().getId().equals(message.getToUser().getId()))){
            message.setToUser(messageCreate.getToUser());
            update=true;
        }

        return update;
    }

    private Message createMessageNoMerge(MessageCreate messageCreate, SecurityContext securityContext) {
        Message message=new Message(messageCreate.getName(),securityContext);
        updateMessageNoMerge(messageCreate,message);
        return message;
    }

    public void validateCreate(MessageCreate messageCreate,SecurityContext securityContext){
        validate(messageCreate,securityContext);
        if(messageCreate.getToUser()==null){
            throw new BadRequestException("No User with id "+messageCreate.getToUserId());

        }
    }

    public void validate(MessageCreate messageCreate, SecurityContext securityContext) {
        String fromUserId=messageCreate.getFromUserId();
        User fromUser=fromUserId!=null?messageRepository.getByIdOrNull(fromUserId,User.class,null,securityContext):securityContext.getUser();
        messageCreate.setFromUser(fromUser);

        String toUserId=messageCreate.getToUserId();
        User toUser=toUserId!=null?messageRepository.getByIdOrNull(toUserId,User.class,null,securityContext):null;
        if(toUserId!=null&&toUser==null){
            throw new BadRequestException("No User with id "+toUserId);
        }
        messageCreate.setToUser(toUser);

    }

    public void validate(MessageFilter filtering, SecurityContext securityContext) {
        baseclassNewService.validateFilter(filtering,securityContext);
        Set<String> fromUserIds=filtering.getFromUsersIds();
        Map<String, User> fromUsers=fromUserIds.isEmpty()?new HashMap<>():messageRepository.listByIds(User.class,fromUserIds,securityContext).stream().collect(Collectors.toMap(f->f.getId(),f->f));
        fromUserIds.removeAll(fromUsers.keySet());
        if(!fromUserIds.isEmpty()){
            throw new BadRequestException("No Users with ids "+fromUserIds);
        }
        filtering.setFromUsers(new ArrayList<>(fromUsers.values()));

        Set<String> toUsersIds=filtering.getToUsersIds();
        Map<String, User> toUsers=toUsersIds.isEmpty()?new HashMap<>():messageRepository.listByIds(User.class,toUsersIds,securityContext).stream().collect(Collectors.toMap(f->f.getId(),f->f));
        toUsersIds.removeAll(toUsers.keySet());
        if(!toUsersIds.isEmpty()){
            throw new BadRequestException("No Users with ids "+toUsersIds);
        }
        filtering.setToUsers(new ArrayList<>(toUsers.values()));


    }

    public PaginationResponse<Message> getAllMessages(MessageFilter filtering, SecurityContext securityContext) {
        List<Message> list=listAllMessages(filtering,securityContext);
        long count=messageRepository.countAllMessages(filtering,securityContext);
        return new PaginationResponse<>(list,filtering,count);
    }

    private List<Message> listAllMessages(MessageFilter filtering, SecurityContext securityContext) {
        return messageRepository.listAllMessages(filtering,securityContext);
    }

    public Message updateMessage(MessageUpdate messageUpdate, SecurityContext securityContext) {
        Message message=messageUpdate.getMessage();
        if(updateMessageNoMerge(messageUpdate,message)){
            messageRepository.merge(message);
        }
        return message;
    }

    public <T extends Baseclass> T getByIdOrNull(String id, Class<T> c, List<String> batchString, SecurityContext securityContext) {
        return messageRepository.getByIdOrNull(id, c, batchString, securityContext);
    }
} 

Step 1.h: Create Message REST Service

lets define the REST service that will expose REST api that our clients will use:

package com.wizzdi.messaging.rest;
 

import com.flexicore.annotations.OperationsInside;
import com.flexicore.annotations.ProtectedREST;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.data.jsoncontainers.PaginationResponse;
import com.flexicore.interfaces.RestServicePlugin;
import com.flexicore.security.SecurityContext;
import com.wizzdi.messaging.model.Message;
import com.wizzdi.messaging.request.MessageCreate;
import com.wizzdi.messaging.request.MessageFilter;
import com.wizzdi.messaging.request.MessageUpdate;
import com.wizzdi.messaging.service.MessageService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.pf4j.Extension;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import java.util.stream.Collectors;
@PluginInfo(version = 1)
@OperationsInside
@ProtectedREST
@Path("plugins/message")
@Tag(name = "message")
@Extension
@Component
public class MessageRESTService implements RestServicePlugin {
   @PluginInfo(version = 1)
   @Autowired
   private MessageService service;
   @POST
   @Produces("application/json")
   @Operation(summary = "getAllMessages", description = "Gets All Messages Filtered")
   @Path("getAllMessages")
   public PaginationResponse<Message> getAllMessages(
         @HeaderParam("authenticationKey") String authenticationKey,
         MessageFilter filtering,
         @Context SecurityContext securityContext) {
      service.validate(filtering, securityContext);
      return service.getAllMessages(filtering, securityContext);
   }
   @POST
   @Produces("application/json")
   @Operation(summary = "createMessage", description = "creates Message")
   @Path("createMessage")
   public Message createMessage(
         @HeaderParam("authenticationKey") String authenticationKey,
         MessageCreate messageCreate,
         @Context SecurityContext securityContext) {
      service.validateCreate(messageCreate, securityContext);
      return service.createMessage(messageCreate, securityContext);
   }
   @PUT
   @Produces("application/json")
   @Operation(summary = "updateMessage", description = "Updates Message")
   @Path("updateMessage")
   public Message updateMessage(
         @HeaderParam("authenticationKey") String authenticationKey,
         MessageUpdate messageUpdate,
         @Context SecurityContext securityContext) {
      Message message = messageUpdate.getId() != null ? service.getByIdOrNull(messageUpdate.getId(), Message.class, null, securityContext) : null;
      if (message == null) {
         throw new BadRequestException("No MessageUpdate with id " + messageUpdate.getId());
      }
      messageUpdate.setMessage(message);
      service.validate(messageUpdate, securityContext);
      return service.updateMessage(messageUpdate, securityContext);
   }
} 

Phase 2: Add WebSoscket Support

currently clients will have to poll for new messages which will cause excessive CPU usage and bad experience for the users since messages will only update during the polling interval , to solve that we will add a Web Socket endpoint to push new messages to clients.

Step 2.a: Change Message-Service Structure

message service v1.1.0 structure

Step 2.b: Create Web Socket Messages Classes

currently we will only support a single new web socket message called NewMessage but we will leave room for more message types that implement IWSMessage interface.

package com.wizzdi.messaging.websocket.messages;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
import com.flexicore.data.jsoncontainers.CrossLoaderResolver;

import java.util.Set;

@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "type")
@JsonTypeIdResolver(CrossLoaderResolver.class)
public interface IWSMessage {

    Set<String> getTarget();
} 
package com.wizzdi.messaging.websocket.messages;

import com.wizzdi.messaging.model.Message;

import java.util.Collections;
import java.util.Set;

public class NewMessage implements IWSMessage{

    private Message newMessage;

    public Message getNewMessage() {
        return newMessage;
    }

    public <T extends NewMessage> T setNewMessage(Message newMessage) {
        this.newMessage = newMessage;
        return (T) this;
    }

    @Override
    public Set<String> getTarget() {
        return Collections.singleton(newMessage.getToUser().getId());
    }
} 

Registration of the new type is done by a component listening on PluginsLoadedEvent – an event sent after plugins beans have been registered.

package com.wizzdi.messaging.config;

import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.data.jsoncontainers.CrossLoaderResolver;
import com.flexicore.events.PluginsLoadedEvent;
import com.flexicore.interfaces.ServicePlugin;
import com.wizzdi.messaging.websocket.messages.NewMessage;
import org.pf4j.Extension;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@PluginInfo(version = 1)
@Component
@Extension
public class Config implements ServicePlugin {

    @EventListener
    public void init(PluginsLoadedEvent pluginsLoadedEvent){
        CrossLoaderResolver.registerClass(NewMessage.class);
    }
} 

Step 2.c: Create Web Socket Message Encoder , Message WebSocket and Message Sender

package com.wizzdi.messaging.websocket.encoders;
 

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.flexicore.data.jsoncontainers.Views;
import com.wizzdi.messaging.websocket.messages.IWSMessage;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
import java.io.IOException;
import java.io.Writer;

public class MessageMessageEncoder implements Encoder.TextStream<IWSMessage> {
   private ObjectMapper objectMapper;
   @Override
   public void init(EndpointConfig config) {
      this.objectMapper = new ObjectMapper()
            .registerModule(new JavaTimeModule())
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
   }
   @Override
   public void destroy() {
   }
   @Override
   public void encode(IWSMessage object, Writer writer) throws IOException {
      objectMapper.writeValue(writer, object);
   }
} 

we are using Jackson’s ObjectMapper to serialize the messages.

package com.wizzdi.messaging.websocket;
 

import com.flexicore.annotations.Protected;
import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.interfaces.WebSocketPlugin;
import com.flexicore.model.User;
import com.flexicore.security.SecurityContext;
import com.wizzdi.messaging.websocket.encoders.MessageMessageEncoder;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@ServerEndpoint(value = "/FlexiCore/messageWS/{authenticationKey}", encoders = {MessageMessageEncoder.class})
@PluginInfo(version = 1)
@Protected
@Extension
@Component
public class MessageWebSocket implements WebSocketPlugin {
   private static final Logger logger= LoggerFactory.getLogger(MessageWebSocket.class);
   private final static Map<String, Session> sessionMap = new ConcurrentHashMap<>();
   private static final String SECURITY_CONTEXT_KEY = "securityContext";
   @OnMessage
   public void receiveMessage(String message, Session session) {
      logger.info("Received : " + message + ", session:" + session.getId());
   }
   @OnOpen
   public void open(@PathParam("authenticationKey") String authenticationKey, Session session) {
      String id = session.getId();
      logger.info("Open session:" + id);
      sessionMap.put(id, session);
   }
   @OnClose
   public void close(@PathParam("authenticationKey") String authenticationKey,Session session, CloseReason c) {
      logger.info("Closing:" + session.getId());
      sessionMap.remove(session.getId());
   }
   public static List<Session> getSessionsForUsers(Set<String> targetUserIds) {
      return sessionMap.values().stream()
            .filter(f->f.getUserProperties()!=null&&
                  f.getUserProperties().containsKey(SECURITY_CONTEXT_KEY)&&
                  targetUserIds.contains(((SecurityContext)f.getUserProperties().get(SECURITY_CONTEXT_KEY)).getUser().getId()))
            .collect(Collectors.toList());
   }
   public static void removeAll(List<Session> sessions){
      for (Session session : sessions) {
         sessionMap.remove(session.getId(),session);
      }
   }
} 

the @ServerEndpoint annotation registers this class as a websocket endpoint and registers the MessageMessageEncoder class as the encoder for outgoing messages , we have added a path parameter place holder so the authnetication key can be provided by clients.

the @Protected annotation is being used to automatically autheticate the incoming authetication key and push the SecurityContext into the sessions properties. once a client connects we add it to a map of running sessions.

package com.wizzdi.messaging.websocket;

import com.flexicore.annotations.plugins.PluginInfo;
import com.flexicore.interfaces.ServicePlugin;
import com.wizzdi.messaging.websocket.messages.IWSMessage;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.websocket.EncodeException;
import javax.websocket.Session;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

@PluginInfo(version = 1)
@Extension
@Component
public class MessageSender implements ServicePlugin {


   private static final Logger logger=LoggerFactory.getLogger(MessageSender.class);


   @Async
   @EventListener
   public void sendEvent(IWSMessage iwsMessage) {
      List<Session> toRemove = new ArrayList<>();


      List<Session> sessionsForUsers = MessageWebSocket.getSessionsForUsers(iwsMessage.getTarget());
      logger.info("Received ui event " + iwsMessage + " to send to " + sessionsForUsers.size() + " sessions");
      for (Session session : sessionsForUsers) {
         try {
            if (!session.isOpen()) {
               toRemove.add(session);
               continue;
            }
            session.getBasicRemote().sendObject(iwsMessage);
         } catch (EncodeException | IOException e) {
            logger.error( "unable to send message", e);
            try {
               session.close();
            } catch (IOException e1) {
               logger.error( "unable to close session");
            }
            toRemove.add(session);
         }
      }
      MessageWebSocket.removeAll(toRemove);
   }



} 

the MessageSender will listen to events of type IWSMessage and will send them to relevant sessions ( this is determined by the getTargetIds method on the message)

Step 2.d: Send NewMessage Event

now we will need to send the NewMessage event when a message is created by updating createMessage method in MessageService :

@Autowired
private ApplicationEventPublisher applicationEventPublisher;


public Message createMessage(MessageCreate messageCreate, SecurityContext securityContext) {
    Message message=createMessageNoMerge(messageCreate,securityContext);
    messageRepository.merge(message);
    applicationEventPublisher.publishEvent(message);
    return message;
}