Skip to content
Snippets Groups Projects
Unverified Commit 14a42071 authored by Sumner Evans's avatar Sumner Evans
Browse files

connector: implement FetchMessages

parent 9ad5a352
No related branches found
No related tags found
No related merge requests found
...@@ -2,65 +2,93 @@ package connector ...@@ -2,65 +2,93 @@ package connector
import ( import (
"context" "context"
"fmt" "time"
"github.com/rs/zerolog"
"maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/networkid"
"go.mau.fi/mautrix-linkedin/pkg/linkedingo" "go.mau.fi/mautrix-linkedin/pkg/linkedingo"
) )
func (l *LinkedInClient) FetchMessages(ctx context.Context, fetchParams bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) { func (l *LinkedInClient) FetchMessages(ctx context.Context, fetchParams bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) {
if messages, ok := fetchParams.BundledData.([]linkedingo.Message); ok { log := zerolog.Ctx(ctx).With().Str("method", "fetch_messages").Logger()
fmt.Printf("%+v\n", messages) ctx = log.WithContext(ctx)
portal, err := l.main.Bridge.GetPortalByKey(ctx, fetchParams.Portal.PortalKey)
if err != nil {
return nil, err
}
resp := bridgev2.FetchMessagesResponse{
Forward: fetchParams.Forward,
// MarkRead: markRead,
}
var messages []linkedingo.Message
if fetchParams.Cursor != "" {
msgs, err := l.client.GetMessagesWithPrevCursor(ctx, linkedingo.NewURN(fetchParams.Portal.ID), string(fetchParams.Cursor), fetchParams.Count)
if err != nil {
return nil, err
} else if len(msgs.Elements) == 0 {
return &bridgev2.FetchMessagesResponse{HasMore: false, Forward: fetchParams.Forward}, nil
}
messages = msgs.Elements
resp.Cursor = networkid.PaginationCursor(msgs.Metadata.PrevCursor)
} else {
msgs, err := l.client.GetMessagesBefore(ctx, linkedingo.NewURN(fetchParams.Portal.ID), time.Now(), fetchParams.Count)
if err != nil {
return nil, err
} else if len(msgs.Elements) == 0 {
return &bridgev2.FetchMessagesResponse{HasMore: false, Forward: fetchParams.Forward}, nil
}
messages = msgs.Elements
resp.Cursor = networkid.PaginationCursor(msgs.Metadata.PrevCursor)
}
var stopAt time.Time
if fetchParams.AnchorMessage != nil {
stopAt = fetchParams.AnchorMessage.Timestamp
log = log.With().Time("stop_at", stopAt).Logger()
}
for _, msg := range messages {
log := log.With().Stringer("entity_urn", msg.EntityURN).Logger()
ctx := log.WithContext(ctx)
if !stopAt.IsZero() {
if fetchParams.Forward && !msg.DeliveredAt.Time.After(stopAt) {
// If we are doing forward backfill and we got to before or at
// the anchor message, don't convert any more messages.
log.Debug().Msg("stopping at anchor message")
break
} else if !msg.DeliveredAt.Time.Before(stopAt) {
// If we are doing backwards backfill and we got to a message
// more recent or equal to the anchor message, skip it.
log.Debug().Msg("skipping message past anchor message")
continue
}
}
sender := l.makeSender(msg.Sender)
intent := portal.GetIntentFor(ctx, sender, l.userLogin, bridgev2.RemoteEventBackfill)
converted, err := l.convertToMatrix(ctx, portal, intent, msg)
if err != nil {
return nil, err
}
backfillMessage := bridgev2.BackfillMessage{
ConvertedMessage: converted,
Sender: sender,
ID: msg.MessageID(),
Timestamp: msg.DeliveredAt.Time,
}
// TODO reactions
resp.Messages = append(resp.Messages, &backfillMessage)
} }
// variables := queryold.FetchMessagesVariables{ resp.HasMore = len(resp.Messages) > 0
// ConversationURN: linkedingo.NewURN(fetchParams.Portal.ID), return &resp, nil
// CountBefore: 20,
// }
//
// if fetchParams.Cursor == "" {
// if fetchParams.AnchorMessage != nil {
// variables.DeliveredAt = jsontime.UM(fetchParams.AnchorMessage.Timestamp)
// }
// } else {
// deliveredAt, err := strconv.ParseInt(string(fetchParams.Cursor), 10, 64)
// if err != nil {
// return nil, err
// }
// variables.DeliveredAt = jsontime.UnixMilli{Time: time.UnixMilli(deliveredAt)}
// }
//
// fetchMessages, err := l.client.FetchMessages(ctx, variables)
// if err != nil {
// return nil, err
// }
//
// fmt.Printf("%+v\n", fetchMessages)
//
panic("here")
//
// messages := fetchMessages.Messages
// sort.Slice(messages, func(j, i int) bool {
// return messages[j].DeliveredAt < messages[i].DeliveredAt
// })
//
// backfilledMessages := make([]*bridgev2.BackfillMessage, len(messages))
// cursor := networkid.PaginationCursor("")
// if len(messages) > 0 {
// cursor = networkid.PaginationCursor(strconv.FormatInt(messages[0].DeliveredAt, 10))
//
// backfilledMessages, err = l.MessagesToBackfillMessages(ctx, messages, fetchParams.Portal)
// if err != nil {
// return nil, err
// }
// }
//
// return &bridgev2.FetchMessagesResponse{
// Messages: backfilledMessages,
// Cursor: cursor,
// HasMore: len(messages) >= 20,
// Forward: fetchParams.Forward,
// }, nil
//
} }
...@@ -171,7 +171,7 @@ func (l *LinkedInClient) onUnknownError(ctx context.Context, err error) { ...@@ -171,7 +171,7 @@ func (l *LinkedInClient) onUnknownError(ctx context.Context, err error) {
func (l *LinkedInClient) onDecoratedEvent(ctx context.Context, decoratedEvent *linkedingo.DecoratedEvent) { func (l *LinkedInClient) onDecoratedEvent(ctx context.Context, decoratedEvent *linkedingo.DecoratedEvent) {
// The topics are always of the form "urn:li-realtime:TOPIC_NAME:<topic_dependent>" // The topics are always of the form "urn:li-realtime:TOPIC_NAME:<topic_dependent>"
switch decoratedEvent.Topic.NthPart(2) { switch decoratedEvent.Topic.NthPrefixPart(2) {
case linkedingo.RealtimeEventTopicMessages: case linkedingo.RealtimeEventTopicMessages:
l.onRealtimeMessage(ctx, decoratedEvent.Payload.Data.DecoratedMessage.Result) l.onRealtimeMessage(ctx, decoratedEvent.Payload.Data.DecoratedMessage.Result)
case linkedingo.RealtimeEventTopicTypingIndicators: case linkedingo.RealtimeEventTopicTypingIndicators:
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"maunium.net/go/mautrix/bridgev2/simplevent" "maunium.net/go/mautrix/bridgev2/simplevent"
) )
// TODO limits
func (l *LinkedInClient) syncConversations(ctx context.Context) { func (l *LinkedInClient) syncConversations(ctx context.Context) {
log := zerolog.Ctx(ctx).With().Str("action", "sync_conversations").Logger() log := zerolog.Ctx(ctx).With().Str("action", "sync_conversations").Logger()
log.Info().Msg("starting conversation sync") log.Info().Msg("starting conversation sync")
...@@ -35,6 +36,9 @@ func (l *LinkedInClient) syncConversations(ctx context.Context) { ...@@ -35,6 +36,9 @@ func (l *LinkedInClient) syncConversations(ctx context.Context) {
if err != nil { if err != nil {
log.Err(err).Msg("failed to fetch conversations") log.Err(err).Msg("failed to fetch conversations")
return return
} else if conversations == nil {
log.Warn().Msg("no conversations found")
return
} }
fmt.Printf("%+v\n", conversations) fmt.Printf("%+v\n", conversations)
...@@ -60,10 +64,9 @@ func (l *LinkedInClient) syncConversations(ctx context.Context) { ...@@ -60,10 +64,9 @@ func (l *LinkedInClient) syncConversations(ctx context.Context) {
} }
} }
l.main.Bridge.QueueRemoteEvent(l.userLogin, &simplevent.ChatResync{ l.main.Bridge.QueueRemoteEvent(l.userLogin, &simplevent.ChatResync{
ChatInfo: ptr.Ptr(l.conversationToChatInfo(conv)), ChatInfo: ptr.Ptr(l.conversationToChatInfo(conv)),
EventMeta: meta.WithType(bridgev2.RemoteEventChatResync), EventMeta: meta.WithType(bridgev2.RemoteEventChatResync),
LatestMessageTS: latestMessageTS, LatestMessageTS: latestMessageTS,
BundledBackfillData: conv.Messages.Elements,
}) })
} }
} }
......
...@@ -58,10 +58,7 @@ const ( ...@@ -58,10 +58,7 @@ const (
) )
const ( const (
graphQLQueryIDMessengerConversations = "messengerConversations.7b27164c5517548167d9adb4ba603e55" graphQLQueryIDMessengerConversationsWithCursor = "messengerConversations.8656fb361a8ad0c178e8d3ff1a84ce26"
graphQLQueryIDMessengerConversationsWithCursor = "messengerConversations.8656fb361a8ad0c178e8d3ff1a84ce26" graphQLQueryIDMessengerMessagesByAnchorTimestamp = "messengerMessages.4088d03bc70c91c3fa68965cb42336de"
graphQLQueryIDMessengerConversationsWithSyncToken = "messengerConversations.277103fa0741e804ec5f21e6f64cb598" graphQLQueryIDMessengerMessagesByPrevCursor = "messengerMessages.34c9888be71c8010fecfb575cb38308f"
graphQLQueryIDMessengerMessagesBySyncToken = "messengerMessages.d1b494ac18c24c8be71ea07b5bd1f831"
graphQLQueryIDMessengerMessagesByAnchorTimestamp = "messengerMessages.b52340f92136e74c2aab21dac7cf7ff2"
graphQLQueryIDMessengerMessagesByConversation = "messengerMessages.86ca573adc64110d94d8bce89c5b2f3b"
) )
...@@ -18,6 +18,8 @@ type GraphQlResponse struct { ...@@ -18,6 +18,8 @@ type GraphQlResponse struct {
type GraphQLData struct { type GraphQLData struct {
MessengerConversationsByCategoryQuery *CollectionResponse[ConversationCursorMetadata, Conversation] `json:"messengerConversationsByCategoryQuery,omitempty"` MessengerConversationsByCategoryQuery *CollectionResponse[ConversationCursorMetadata, Conversation] `json:"messengerConversationsByCategoryQuery,omitempty"`
MessengerMessagesByAnchorTimestamp *CollectionResponse[MessageMetadata, Message] `json:"messengerMessagesByAnchorTimestamp,omitempty"`
MessengerMessagesByConversation *CollectionResponse[MessageMetadata, Message] `json:"messengerMessagesByConversation,omitempty"`
} }
// CollectionResponse represents a // CollectionResponse represents a
...@@ -32,6 +34,12 @@ type ConversationCursorMetadata struct { ...@@ -32,6 +34,12 @@ type ConversationCursorMetadata struct {
NextCursor string `json:"nextCursor,omitempty"` NextCursor string `json:"nextCursor,omitempty"`
} }
// MessageMetadata represents a com.linkedin.messenger.MessageMetadata object.
type MessageMetadata struct {
NextCursor string `json:"nextCursor,omitempty"`
PrevCursor string `json:"prevCursor,omitempty"`
}
// Conversation represents a com.linkedin.messenger.Conversation object // Conversation represents a com.linkedin.messenger.Conversation object
type Conversation struct { type Conversation struct {
Title string `json:"title,omitempty"` Title string `json:"title,omitempty"`
......
package linkedingo package linkedingo
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/rs/zerolog"
"go.mau.fi/util/jsontime" "go.mau.fi/util/jsontime"
"go.mau.fi/util/random" "go.mau.fi/util/random"
"maunium.net/go/mautrix/bridgev2/networkid" "maunium.net/go/mautrix/bridgev2/networkid"
"go.mau.fi/mautrix-linkedin/pkg/linkedingoold/routingold/queryold"
"go.mau.fi/mautrix-linkedin/pkg/linkedingoold/routingold/responseold"
) )
type sendMessagePayload struct { type sendMessagePayload struct {
...@@ -190,47 +192,51 @@ func (c *Client) RecallMessage(ctx context.Context, messageURN URN) error { ...@@ -190,47 +192,51 @@ func (c *Client) RecallMessage(ctx context.Context, messageURN URN) error {
return nil return nil
} }
func (c *Client) FetchMessages(ctx context.Context, conversationURN URN, variables queryold.FetchMessagesVariables) (*responseold.MessengerMessagesResponse, error) { func (c *Client) GetMessagesBefore(ctx context.Context, conversationURN URN, before time.Time, count int) (*CollectionResponse[MessageMetadata, Message], error) {
withCursor := variables.PrevCursor != "" zerolog.Ctx(ctx).Info().
withAnchorTimestamp := !variables.DeliveredAt.IsZero() Time("before", before).
Msg("Getting conversations delivered before")
resp, err := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL).
WithGraphQLQuery(graphQLQueryIDMessengerMessagesByAnchorTimestamp, map[string]string{
"deliveredAt": strconv.Itoa(int(before.UnixMilli())),
"conversationUrn": url.QueryEscape(conversationURN.WithPrefix("urn", "li", "msg_conversation").String()),
"countBefore": strconv.Itoa(count),
"countAfter": "0",
}).
WithCSRF().
WithXLIHeaders().
WithHeader("accept", contentTypeGraphQL).
Do(ctx)
if err != nil {
return nil, err
}
x, _ := io.ReadAll(resp.Body)
fmt.Printf("%s\n", x)
resp.Body = io.NopCloser(bytes.NewReader(x))
var queryID string var response GraphQlResponse
if withCursor { return response.Data.MessengerMessagesByAnchorTimestamp, json.NewDecoder(resp.Body).Decode(&response)
queryID = graphQLQueryIDMessengerMessagesByConversation }
} else if withAnchorTimestamp {
queryID = graphQLQueryIDMessengerMessagesByAnchorTimestamp func (c *Client) GetMessagesWithPrevCursor(ctx context.Context, conversationURN URN, prevCursor string, count int) (*CollectionResponse[MessageMetadata, Message], error) {
} else { zerolog.Ctx(ctx).Info().
queryID = graphQLQueryIDMessengerMessagesBySyncToken Str("prev_cursor", prevCursor).
Msg("Getting conversations with prev cursor")
resp, err := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL).
WithGraphQLQuery(graphQLQueryIDMessengerMessagesByPrevCursor, map[string]string{
"conversationUrn": url.QueryEscape(conversationURN.WithPrefix("urn", "li", "msg_conversation").String()),
"count": strconv.Itoa(count),
"prevCursor": url.QueryEscape(prevCursor),
}).
WithCSRF().
WithXLIHeaders().
WithHeader("accept", contentTypeGraphQL).
Do(ctx)
if err != nil {
return nil, err
} }
fmt.Printf("queryID = %s\n", queryID)
return nil, nil var response GraphQlResponse
return response.Data.MessengerMessagesByConversation, json.NewDecoder(resp.Body).Decode(&response)
// variablesQuery, err := variables.Encode()
// if err != nil {
// return nil, err
// }
//
// resp, err := c.newAuthedRequest(http.MethodGet, linkedInVoyagerMessagingGraphQLURL).
// WithGraphQLQuery(queryID, variables).
// WithCSRF().
// WithXLIHeaders().
// WithHeader("accept", contentTypeGraphQL).
// Do(ctx)
// if err != nil {
// return nil, err
// }
//
// var graphQLResponse responseold.GraphQlResponse
// if err = json.NewDecoder(resp.Body).Decode(&graphQLResponse); err != nil {
// return nil, err
// }
//
// graphQLResponseData := graphQLResponse.Data
// if withCursor {
// return graphQLResponseData.MessengerMessagesByConversation, nil
// } else if withAnchorTimestamp {
// return graphQLResponseData.MessengerMessagesByAnchorTimestamp, nil
// } else {
// return graphQLResponseData.MessengerMessagesBySyncToken, nil
// }
} }
...@@ -70,6 +70,7 @@ func (a *authedRequest) WithRawQuery(raw string) *authedRequest { ...@@ -70,6 +70,7 @@ func (a *authedRequest) WithRawQuery(raw string) *authedRequest {
} }
func (a *authedRequest) WithGraphQLQuery(queryID string, variables map[string]string) *authedRequest { func (a *authedRequest) WithGraphQLQuery(queryID string, variables map[string]string) *authedRequest {
a.WithHeader("accept", contentTypeGraphQL)
var queryStr strings.Builder var queryStr strings.Builder
queryStr.WriteString("queryId=") queryStr.WriteString("queryId=")
queryStr.WriteString(queryID) queryStr.WriteString(queryID)
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/url" "net/url"
"regexp"
"strings" "strings"
) )
...@@ -14,13 +15,19 @@ func (u URNString) URN() URN { ...@@ -14,13 +15,19 @@ func (u URNString) URN() URN {
} }
type URN struct { type URN struct {
parts []string prefix string
idParts []string id string
} }
var urnRegex = regexp.MustCompile(`^(.*?):(\(.*\)|[^:]*)$`)
func NewURN[T ~string](s T) (u URN) { func NewURN[T ~string](s T) (u URN) {
u.parts = strings.Split(string(s), ":") match := urnRegex.FindStringSubmatch(string(s))
u.idParts = strings.Split(strings.Trim(u.parts[len(u.parts)-1], "()"), ",") if len(match) == 0 {
return URN{id: string(s)}
}
u.prefix = match[1]
u.id = match[2]
return return
} }
...@@ -29,10 +36,7 @@ var _ json.Unmarshaler = (*URN)(nil) ...@@ -29,10 +36,7 @@ var _ json.Unmarshaler = (*URN)(nil)
var _ fmt.Stringer = (*URN)(nil) var _ fmt.Stringer = (*URN)(nil)
func (u URN) ID() string { func (u URN) ID() string {
if len(u.idParts) != 1 { return u.id
panic(fmt.Sprintf("wrong number of ID parts %d", len(u.idParts)))
}
return u.idParts[0]
} }
func (u URN) URNString() URNString { func (u URN) URNString() URNString {
...@@ -40,7 +44,7 @@ func (u URN) URNString() URNString { ...@@ -40,7 +44,7 @@ func (u URN) URNString() URNString {
} }
func (u URN) String() string { func (u URN) String() string {
return strings.Join(u.parts, ":") return u.prefix + ":" + u.id
} }
func (u URN) URLEscaped() string { func (u URN) URLEscaped() string {
...@@ -48,7 +52,7 @@ func (u URN) URLEscaped() string { ...@@ -48,7 +52,7 @@ func (u URN) URLEscaped() string {
} }
func (u URN) IsEmpty() bool { func (u URN) IsEmpty() bool {
return len(u.parts) == 0 return len(u.id) == 0
} }
func (u *URN) UnmarshalJSON(data []byte) (err error) { func (u *URN) UnmarshalJSON(data []byte) (err error) {
...@@ -57,8 +61,8 @@ func (u *URN) UnmarshalJSON(data []byte) (err error) { ...@@ -57,8 +61,8 @@ func (u *URN) UnmarshalJSON(data []byte) (err error) {
return err return err
} }
newURN := NewURN(urn) newURN := NewURN(urn)
u.parts = newURN.parts u.prefix = newURN.prefix
u.idParts = newURN.idParts u.id = newURN.id
return nil return nil
} }
...@@ -66,13 +70,13 @@ func (u URN) MarshalJSON() ([]byte, error) { ...@@ -66,13 +70,13 @@ func (u URN) MarshalJSON() ([]byte, error) {
return json.Marshal(u.String()) return json.Marshal(u.String())
} }
func (u URN) NthPart(n int) string { func (u URN) NthPrefixPart(n int) string {
return u.parts[n] return strings.Split(u.prefix, ":")[n]
} }
// WithPrefix returns a URN with the given prefix but the same ID (last part) // WithPrefix returns a URN with the given prefix but the same ID (last part)
func (u URN) WithPrefix(prefix ...string) (n URN) { func (u URN) WithPrefix(prefix ...string) (n URN) {
n.parts = append(prefix, u.parts[len(u.parts)-1]) n.prefix = strings.Join(prefix, ":")
n.idParts = u.idParts n.id = u.id
return return
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment