Skip to content

Commit

Permalink
Release v1.1.3
Browse files Browse the repository at this point in the history
  • Loading branch information
alyakimenko authored Sep 12, 2019
2 parents 09323b9 + 2020118 commit 3836131
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 52 deletions.
102 changes: 51 additions & 51 deletions pkg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"time"

"github.com/MoonSHRD/p2chat-android/pkg/match"
Expand All @@ -25,20 +24,26 @@ import (
)

const (
// Defines the timeout when new peer was found
peerlistConnectionTimeout = time.Millisecond * 300
)

var (
myself host.Host
globalCtx context.Context
globalCtxCancel context.CancelFunc
Pb *pubsub.PubSub
networkTopics = mapset.NewSet()
messageQueue utils.Queue
handler p2chat.Handler
serviceTopic string
subscribedTopics map[string]chan struct{} // Pair "Topic-Channel", channel need for stopping listening
matches match.Response
// Pb is main object for accessing the pubsub system
Pb *pubsub.PubSub
// Match is object to work with matches.
// Get all matches, get new match, add new match, etc.
Match match.MatchProcessor

myself host.Host
globalCtx context.Context
globalCtxCancel context.CancelFunc
networkTopics = mapset.NewSet()
messageQueue utils.Queue
handler p2chat.Handler
serviceTopic string
// Pair "Topic-Channel", channel need for stopping listening
subscribedTopics map[string]chan struct{}
)

// this function get new messages from subscribed topic
Expand Down Expand Up @@ -83,7 +88,7 @@ func readSub(subscription *pubsub.Subscription, incomingMessagesChan chan pubsub
}
}

// Publish message into some topic
// PublishMessage publishes message into some topic
func PublishMessage(topic string, text string) {
message := &api.BaseMessage{
Body: text,
Expand All @@ -105,6 +110,7 @@ func PublishMessage(topic string, text string) {
}
}

// Start launches main p2chat process
func Start(rendezvous string, pid string, listenHost string, port int) {
subscribedTopics = make(map[string]chan struct{})
utils.SetConfig(&utils.Configuration{
Expand Down Expand Up @@ -190,103 +196,96 @@ MainLoop:
log.Println("Connection failed:", err)
}
log.Println("\nConnected to:", newPeer)

time.Sleep(peerlistConnectionTimeout)
matches = getMatchResponse()
getMatchResponse(newPeer.ID)
}
}
}
}

// GetJSONMatches returns the matches map within json format
func GetJSONMatches() []byte {
jsonResponse, err := json.Marshal(matches)
if err != nil {
log.Println(err.Error())
return []byte("{}")
}
return jsonResponse
}

// GetMatchResponse collects a list of topics to which the peer is subscribed,
// collects a list of peers from these topics,
// requests to its matrixIDs and then marshals them to json
func getMatchResponse() match.Response {
var response match.Response

func getMatchResponse(newPeerID peer.ID) {
// Send request for peers identity to fills up the identity map
GetPeersIdentity()

var peerTopics []string
peerMatrixID := getMatrixIDFromPeerID(newPeerID)

// Get topics this node is subscribed to check new node inclusiveness to them
topics := handler.GetTopics()
for _, topic := range topics {
// Get peer list of subscribed peers to specific topic
topicPeers := handler.GetPeers(topic)
response[topic] = getMatrixIDsFromPeers(topicPeers)

for _, peerID := range topicPeers {
// Check if new peer is included to the peer list of the specific topic
if peerID == newPeerID {
peerTopics = append(peerTopics, topic)
}
}
}

return response
Match.AddNewMatch(peerMatrixID, peerTopics)
}

// Passes through all peer.ID and takes out their matrixID
// from the identity matrix of handler
func getMatrixIDsFromPeers(peerIDs []peer.ID) []string {
// Returns the peer matrixID from identity map by its peerID
func getMatrixIDFromPeerID(peerID peer.ID) string {
idenityMap := handler.GetIdentityMap()

var matrixIDs []string
for _, peerID := range peerIDs {
matrixIDs = append(matrixIDs, idenityMap[peerID])
}

return matrixIDs
return idenityMap[peerID]
}

// SetMatrixID sets the matrixID of a current peer
func SetMatrixID(mxID string) {
handler.SetMatrixID(mxID)
}

// GetNetworkTopics requests network topics from other peers
func GetNetworkTopics() {
ctx := globalCtx
handler.RequestNetworkTopics(ctx)
}

// GetPeersIdentity requests MatrixID from other peers
func GetPeersIdentity() {
ctx := globalCtx
handler.RequestPeersIdentity(ctx)
}

func GetTopics() []byte {
// GetTopics is method for getting subcribed topics of current peer
func GetTopics() string {
topics := handler.GetTopics()
return convertStringSliceToBytes(topics)
return utils.ObjectToJSON(topics)
}

func GetPeers(topic string) []byte {
// GetPeers is method for getting peer ids by topic
func GetPeers(topic string) string {
var peersStrings []string

for _, peer := range handler.GetPeers(topic) {
peersStrings = append(peersStrings, string(peer))
}

return convertStringSliceToBytes(peersStrings)
}

func convertStringSliceToBytes(pids []string) []byte {
return []byte(strings.Join(pids, " "))
return utils.ObjectToJSON(peersStrings)
}

// BlacklistPeer blacklists peer by its peer.ID
func BlacklistPeer(pid string) {
handler.BlacklistPeer(peer.ID(pid))
}

// GetMessages returns json message string from the message-queue
func GetMessages() string {
textMessage := messageQueue.PopBack()
if textMessage != nil {
jsonData, err := json.Marshal(textMessage)
if err != nil {
return ""
}
return string(jsonData)
return utils.ObjectToJSON(textMessage)
}
return ""
}

// SubscribeToTopic allows to subscribe to specific topic
func SubscribeToTopic(topic string) {
incomingMessages := make(chan pubsub.Message)
subscription, err := Pb.Subscribe(topic)
Expand Down Expand Up @@ -316,6 +315,7 @@ ListenLoop:
}
}

// UnsubscribeFromTopic allows to unsubscribe from specific topic
func UnsubscribeFromTopic(topic string) {
if subscribedTopics[topic] != nil {
close(subscribedTopics[topic])
Expand Down
32 changes: 31 additions & 1 deletion pkg/match/match.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,35 @@
package match

import (
"github.com/MoonSHRD/p2chat-android/pkg/utils"
)

// Response alias is used for json marshaling
// the match ([topic]=>[mxIDs]) response
// the whole match ([topic]=>[matrixIDs]) response
// or for the new matches ([matrixID]=>[topics])
type Response map[string][]string

// MatchProcessor is helper type for work
// with match operations processing
type MatchProcessor struct {
mathes Response
newMatchesQueue utils.Queue
}

// GetAllMatches returns the whole matches map within json format
func (mp *MatchProcessor) GetAllMatches() string {
return utils.ObjectToJSON(mp.mathes)
}

// GetNewMatch returns new match from the matches-queue
func (mp *MatchProcessor) GetNewMatch() string {
return utils.ObjectToJSON(mp.newMatchesQueue.PopBack())
}

// AddNewMatch pushes new match [topic]=>[newMatrixID] to the matches-queue and map
func (mp *MatchProcessor) AddNewMatch(matrixID string, topics []string) {
for _, topic := range topics {
mp.mathes[topic] = append(mp.mathes[topic], matrixID)
}
mp.newMatchesQueue.PushBack(Response{matrixID: topics})
}
16 changes: 16 additions & 0 deletions pkg/utils/convert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package utils

import (
"encoding/json"
"log"
)

// ObjectToJSON converts any object to json-string
func ObjectToJSON(v interface{}) string {
json, err := json.Marshal(v)
if err != nil {
log.Println(err.Error())
return ""
}
return string(json)
}

0 comments on commit 3836131

Please sign in to comment.