Skip to content
Snippets Groups Projects
Unverified Commit 882927b3 authored by Sumner Evans's avatar Sumner Evans
Browse files

linkedingo: improve reporting of realtime connect errors

parent caf6c304
No related branches found
No related tags found
No related merge requests found
Pipeline #16517 passed
......@@ -82,8 +82,10 @@ func NewLinkedInClient(ctx context.Context, lc *LinkedInConnector, login *bridge
ClientConnection: func(context.Context, *linkedingo.ClientConnection) {
login.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
},
RealtimeConnectError: client.onRealtimeConnectError,
DecoratedEvent: client.onDecoratedEvent,
TransientDisconnect: client.onTransientDisconnect,
BadCredentials: client.onBadCredentials,
UnknownError: client.onUnknownError,
DecoratedEvent: client.onDecoratedEvent,
},
)
......@@ -137,14 +139,33 @@ func (l *LinkedInClient) Connect(ctx context.Context) {
}
}
func (l *LinkedInClient) onRealtimeConnectError(ctx context.Context, err error) {
func (l *LinkedInClient) onTransientDisconnect(ctx context.Context, err error) {
zerolog.Ctx(ctx).Err(err).Msg("failed to read from event stream")
// TODO probably don't do this unconditionally
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateTransientDisconnect,
Error: "linkedin-transient-disconnect",
Message: err.Error(),
})
}
func (l *LinkedInClient) onBadCredentials(ctx context.Context, err error) {
zerolog.Ctx(ctx).Err(err).Msg("bad credentials")
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: "linkedin-no-auth",
Error: "linkedin-bad-credentials",
Message: err.Error(),
})
l.Disconnect()
}
func (l *LinkedInClient) onUnknownError(ctx context.Context, err error) {
zerolog.Ctx(ctx).Err(err).Msg("unknown error")
l.userLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateUnknownError,
Error: "linkedin-unknown-error",
Message: err.Error(),
})
// TODO probably don't do this unconditionally?
l.Disconnect()
}
......
......@@ -63,10 +63,12 @@ func NewClient(ctx context.Context, userEntityURN types.URN, jar *stringcookieja
}
type Handlers struct {
Heartbeat func(context.Context)
ClientConnection func(context.Context, *ClientConnection)
RealtimeConnectError func(context.Context, error)
DecoratedEvent func(context.Context, *DecoratedEvent)
Heartbeat func(context.Context)
ClientConnection func(context.Context, *ClientConnection)
TransientDisconnect func(context.Context, error)
BadCredentials func(context.Context, error)
UnknownError func(context.Context, error)
DecoratedEvent func(context.Context, *DecoratedEvent)
}
func (h Handlers) onHeartbeat(ctx context.Context) {
......@@ -81,9 +83,21 @@ func (h Handlers) onClientConnection(ctx context.Context, conn *ClientConnection
}
}
func (h Handlers) onRealtimeConnectError(ctx context.Context, err error) {
if h.RealtimeConnectError != nil {
h.RealtimeConnectError(ctx, err)
func (h Handlers) onTransientDisconnect(ctx context.Context, err error) {
if h.TransientDisconnect != nil {
h.TransientDisconnect(ctx, err)
}
}
func (h Handlers) onBadCredentials(ctx context.Context, err error) {
if h.BadCredentials != nil {
h.BadCredentials(ctx, err)
}
}
func (h Handlers) onUnknownError(ctx context.Context, err error) {
if h.UnknownError != nil {
h.UnknownError(ctx, err)
}
}
......
......@@ -230,11 +230,15 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) {
WithHeader("Accept", contentTypeTextEventStream).
Do(ctx)
if err != nil {
c.handlers.onRealtimeConnectError(ctx, err)
c.handlers.onUnknownError(ctx, fmt.Errorf("failed to connect: %w", err))
return
}
if c.realtimeResp.StatusCode != http.StatusOK {
c.handlers.onRealtimeConnectError(ctx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode))
} else if c.realtimeResp.StatusCode != http.StatusOK {
switch c.realtimeResp.StatusCode {
case http.StatusUnauthorized:
c.handlers.onBadCredentials(ctx, fmt.Errorf("got %d on connect", c.realtimeResp.StatusCode))
default:
c.handlers.onUnknownError(ctx, fmt.Errorf("failed to connect due to status code %d", c.realtimeResp.StatusCode))
}
return
}
......@@ -245,12 +249,15 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) {
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, io.EOF) {
} else if errors.Is(err, io.EOF) {
log.Info().
Stringer("realtime_session_id", c.realtimeSessionID).
Msg("Realtime stream closed")
break
} else {
c.handlers.onTransientDisconnect(ctx, fmt.Errorf("failed to read realtime stream: %w", err))
break
}
c.handlers.onRealtimeConnectError(ctx, err)
break
}
if !bytes.HasPrefix(line, []byte("data:")) {
......@@ -259,7 +266,7 @@ func (c *Client) realtimeConnectLoop(ctx context.Context) {
var realtimeEvent RealtimeEvent
if err = json.Unmarshal(line[6:], &realtimeEvent); err != nil {
c.handlers.onRealtimeConnectError(ctx, err)
c.handlers.onTransientDisconnect(ctx, fmt.Errorf("failed to unmarshal realtime event: %w", err))
break
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment