From a4799b8cbf9bd2d6bcb10870d85b56caba3125da Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Tue, 26 May 2026 14:52:59 +0200 Subject: [PATCH] Support specifying To/From SIP headers for outbound. --- go.mod | 2 +- go.sum | 4 +- pkg/sip/client.go | 216 ++++++++++++++++++++++++++++++++++++-------- pkg/sip/outbound.go | 60 ++++-------- 4 files changed, 202 insertions(+), 80 deletions(-) diff --git a/go.mod b/go.mod index a1819c75..fc1ec688 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/media-sdk v0.0.0-20260522182459-8bac15173fa2 github.com/livekit/mediatransportutil v0.0.0-20260309115634-0e2e24b36ee8 - github.com/livekit/protocol v1.45.9-0.20260518225207-2cfe2d2aa772 + github.com/livekit/protocol v1.46.1-0.20260526102102-06bc4e74f196 github.com/livekit/psrpc v0.7.1 github.com/livekit/server-sdk-go/v2 v2.16.4-0.20260518235838-059306bbcfac github.com/livekit/sipgo v0.13.2-0.20260519205735-a5b4a38b6ceb diff --git a/go.sum b/go.sum index 71a783de..0e7e5350 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,8 @@ github.com/livekit/media-sdk v0.0.0-20260522182459-8bac15173fa2 h1:lLf+4efpv5gme github.com/livekit/media-sdk v0.0.0-20260522182459-8bac15173fa2/go.mod h1:y6iM86wusHKLd5Cqomiq/nRPB+UkMV6H7JTz/zAOoMs= github.com/livekit/mediatransportutil v0.0.0-20260309115634-0e2e24b36ee8 h1:coWig9fKxdb/nwOaIoGUUAogso12GblAJh/9SA9hcxk= github.com/livekit/mediatransportutil v0.0.0-20260309115634-0e2e24b36ee8/go.mod h1:RCd46PT+6sEztld6XpkCrG1xskb0u3SqxIjy4G897Ss= -github.com/livekit/protocol v1.45.9-0.20260518225207-2cfe2d2aa772 h1:JOlU9yyc65+qGW5os8fFgPtih89vSsGT+Dv42nzTMEw= -github.com/livekit/protocol v1.45.9-0.20260518225207-2cfe2d2aa772/go.mod h1:KEPIJ/ZdMFQ9tmmfv/uT9TjQEuEcZupCZBabuRGEC1k= +github.com/livekit/protocol v1.46.1-0.20260526102102-06bc4e74f196 h1:KVmbUFbgfpXVSSbkVB6GiEBnWbYg+xvfDZqhJkPpVKA= +github.com/livekit/protocol v1.46.1-0.20260526102102-06bc4e74f196/go.mod h1:KEPIJ/ZdMFQ9tmmfv/uT9TjQEuEcZupCZBabuRGEC1k= github.com/livekit/psrpc v0.7.1 h1:ms37az0QTD3UXIWuUC5D/SkmKOlRMVRsI261eBWu/Vw= github.com/livekit/psrpc v0.7.1/go.mod h1:bZ4iHFQptTkbPnB0LasvRNu/OBYXEu1NA6O5BMFo9kk= github.com/livekit/server-sdk-go/v2 v2.16.4-0.20260518235838-059306bbcfac h1:/pDmTAM7N3J5bzd5oGUBpQRUPAt17iyn1buTen5ZhXY= diff --git a/pkg/sip/client.go b/pkg/sip/client.go index 44049f06..c4a1ed29 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -16,8 +16,11 @@ package sip import ( "context" + "fmt" "log/slog" + "net" "net/netip" + "strconv" "strings" "sync" "time" @@ -25,6 +28,8 @@ import ( "github.com/frostbyte73/core" "golang.org/x/exp/maps" + esip "github.com/emiago/sipgo/sip" + "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" @@ -175,32 +180,168 @@ func (c *Client) getActiveCall(tag LocalTag) *outboundCall { return c.activeCalls[tag] } +func setUriTransport(p *sip.Uri, tr livekit.SIPTransport) { + if tr != livekit.SIPTransport_SIP_TRANSPORT_AUTO { + p.UriParams.Add("transport", tr.Name()) + } +} + +func buildLegacyURI(user, addr string, tr livekit.SIPTransport) (*sip.Uri, error) { + if user == "" { + return nil, fmt.Errorf("number must be set") + } else if strings.Contains(user, "@") { + return nil, fmt.Errorf("should be a phone number or SIP user, not a full SIP URI") + } + if addr == "" { + return nil, fmt.Errorf("address must be set") + } + if strings.HasPrefix(addr, "sip:") || strings.HasPrefix(addr, "sips:") { + return nil, fmt.Errorf("address must be a hostname without 'sip:' prefix") + } else if strings.Contains(addr, "transport=") { + return nil, fmt.Errorf("legacy address must not contain parameters; use transport field") + } else if strings.ContainsAny(addr, ";=") { + return nil, fmt.Errorf("legacy address must not contain parameters") + } + p := &sip.Uri{Scheme: "sip"} + setUriTransport(p, tr) + + p.User = user + if host, sport, err := net.SplitHostPort(addr); err == nil && sport != "" { + p.Host = host + p.Port, err = strconv.Atoi(sport) + if err != nil { + return nil, fmt.Errorf("invalid port in hostname: %q", sport) + } + } else { + p.Host = addr + } + return p, nil +} + +func buildRawURI(raw string, tr livekit.SIPTransport) (*sip.Uri, error) { + p := &sip.Uri{Scheme: "sip"} + setUriTransport(p, tr) + if err := esip.ParseUri(raw, p); err != nil { + return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "invalid request URI") + } + return p, nil +} + +func buildValuesURI(u *livekit.SIPUri, tr livekit.SIPTransport) (*sip.Uri, error) { + if tr != u.Transport { + if u.Transport == livekit.SIPTransport_SIP_TRANSPORT_AUTO { + //tr = tr + } else if tr == livekit.SIPTransport_SIP_TRANSPORT_AUTO { + tr = u.Transport + } else { + return nil, fmt.Errorf("different transports specified: %v vs %v", tr, u.Transport) + } + } + p := &sip.Uri{Scheme: "sip"} + setUriTransport(p, tr) + if u.User == "" { + return nil, fmt.Errorf("username or number must be set") + } + if u.Host == "" && u.Ip == "" { + return nil, fmt.Errorf("host or ip must be set") + } + p.User = u.User + p.Host = u.Host + if p.Host == "" { + p.Host = u.Ip + } + if _, sport, err := net.SplitHostPort(p.Host); err == nil && sport != "" { + return nil, fmt.Errorf("host or ip must not contain port") + } + p.Port = int(u.Port) + return p, nil +} + +func buildRequestURI(u *livekit.SIPRequestDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.Uri, error) { + if u == nil { + return buildLegacyURI(legacyUser, legacyAddr, tr) + } + switch u := u.Uri.(type) { + default: + case *livekit.SIPRequestDest_Raw: + return buildRawURI(u.Raw, tr) + case *livekit.SIPRequestDest_Values: + return buildValuesURI(u.Values, tr) + } + return nil, fmt.Errorf("invalid request URI type") +} + +func buildFromToURI(u *livekit.SIPNamedDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.Uri, error) { + if u == nil { + return buildLegacyURI(legacyUser, legacyAddr, tr) + } + switch u := u.Uri.(type) { + default: + case *livekit.SIPNamedDest_Raw: + return buildRawURI(u.Raw, tr) + case *livekit.SIPNamedDest_Values: + return buildValuesURI(u.Values, tr) + } + return nil, fmt.Errorf("invalid URI type") +} + +func buildFromHeader(u *livekit.SIPNamedDest, legacyName *string, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.FromHeader, error) { + su, err := buildFromToURI(u, legacyUser, legacyAddr, tr) + if err != nil { + return nil, err + } + h := &sip.FromHeader{ + Address: *su, + } + if u != nil { + h.DisplayName = u.DisplayName + } else if legacyName != nil { + h.DisplayName = *legacyName + } else { + // Nothing specified, preserve legacy behavior + h.DisplayName = su.User + } + return h, nil +} + +func buildToHeader(u *livekit.SIPNamedDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.ToHeader, error) { + su, err := buildFromToURI(u, legacyUser, legacyAddr, tr) + if err != nil { + return nil, err + } + h := &sip.ToHeader{ + Address: *su, + } + if u != nil { + h.DisplayName = u.DisplayName + } + return h, nil +} + +func buildOutboundHeaders(req *rpc.InternalCreateSIPParticipantRequest) (*sip.Uri, *sip.FromHeader, *sip.ToHeader, error) { + uri, err := buildRequestURI(req.SipRequestUri, req.CallTo, req.Address, req.Transport) + if err != nil { + return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid request URI: %w", err)) + } + to, err := buildToHeader(req.SipToHeader, req.CallTo, req.Address, req.Transport) + if err != nil { + return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid To header: %w", err)) + } + from, err := buildFromHeader(req.SipFromHeader, req.DisplayName, req.Number, req.Hostname, req.Transport) + if err != nil { + return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid From header: %w", err)) + } + return uri, from, to, nil +} + func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (resp *rpc.InternalCreateSIPParticipantResponse, retErr error) { if c.mon.Health() != stats.HealthOK { return nil, siperrors.ErrUnavailable } req.Upgrade() - if req.CallTo == "" { - return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "call-to number must be set") - } else if req.Address == "" { - return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "trunk adresss must be set") - } else if req.Number == "" { - return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "trunk outbound number must be set") - } else if req.RoomName == "" { + if req.RoomName == "" { return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "room name must be set") } - if strings.Contains(req.CallTo, "@") { - return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "call_to should be a phone number or SIP user, not a full SIP URI") - } - if strings.HasPrefix(req.Address, "sip:") || strings.HasPrefix(req.Address, "sips:") { - return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must be a hostname without 'sip:' prefix") - } - if strings.Contains(req.Address, "transport=") { - return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must not contain parameters; use transport field") - } - if strings.ContainsAny(req.Address, ";=") { - return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must not contain parameters") - } log := c.log if req.ProjectId != "" { log = log.WithValues("projectID", req.ProjectId) @@ -212,6 +353,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea if err != nil { return nil, err } + uri, from, to, err := buildOutboundHeaders(req) + if err != nil { + return nil, err + } tid := traceid.FromGUID(req.SipCallId) log = log.WithValues( "callID", req.SipCallId, @@ -219,15 +364,17 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea "room", req.RoomName, "participant", req.ParticipantIdentity, "participantName", req.ParticipantName, - "fromHost", req.Hostname, - "fromUser", req.Number, - "toHost", req.Address, - "toUser", req.CallTo, + "fromHost", from.Address.Host, + "fromUser", from.Address.User, + "toHost", to.Address.Host, + "toUser", to.Address.User, + "reqHost", uri.Host, + "reqUser", uri.User, "direction", "outbound", ) req.ParticipantAttributes = maps.Clone(req.ParticipantAttributes) // shallow clone - string/string map. Needed to avoid mutating psrpc req - state := NewCallState(c.getIOClient(req.ProjectId), c.createSIPCallInfo(req)) + state := NewCallState(c.getIOClient(req.ProjectId), c.createSIPCallInfo(uri, from, to, req)) defer func() { state.Update(ctx, func(info *livekit.SIPCallInfo) { @@ -255,11 +402,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea }, } sipConf := sipOutboundConfig{ - address: req.Address, transport: req.Transport, - host: req.Hostname, - from: req.Number, - to: req.CallTo, + uri: uri, + from: from, + to: to, user: req.Username, pass: req.Password, dtmf: req.Dtmf, @@ -273,7 +419,6 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea enabledFeatures: req.EnabledFeatures, featureFlags: req.FeatureFlags, mediaConfig: mconf, - displayName: req.DisplayName, } log.Infow("Creating SIP participant") call, err := c.newCall(ctx, tid, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, state, req.ProjectId) @@ -299,13 +444,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea return info, nil } -func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo { - toUri := CreateURIFromUserAndAddress(req.CallTo, req.Address, TransportFrom(req.Transport)) - fromiUri := URI{ - User: req.Number, - Host: req.Hostname, - Addr: netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort)), - } +func (c *Client) createSIPCallInfo(uri *sip.Uri, from *sip.FromHeader, to *sip.ToHeader, req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo { + toUri := ConvertURI(&to.Address) + fromUri := ConvertURI(&from.Address) + fromUri.Addr = netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort)) callInfo := &livekit.SIPCallInfo{ CallId: req.SipCallId, @@ -316,7 +458,7 @@ func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest) ParticipantAttributes: req.ParticipantAttributes, CallDirection: livekit.SIPCallDirection_SCD_OUTBOUND, ToUri: toUri.ToSIPUri(), - FromUri: fromiUri.ToSIPUri(), + FromUri: fromUri.ToSIPUri(), CreatedAtNs: time.Now().UnixNano(), MediaEncryption: req.MediaEncryption.String(), EnabledFeatures: req.EnabledFeatures, diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 9a833f4c..141c607b 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -46,11 +46,10 @@ import ( ) type sipOutboundConfig struct { - address string transport livekit.SIPTransport - host string - from string - to string + uri *sip.Uri + from *sip.FromHeader + to *sip.ToHeader user string pass string dtmf string @@ -64,7 +63,6 @@ type sipOutboundConfig struct { enabledFeatures []livekit.SIPFeature featureFlags map[string]string mediaConfig *sipMediaConfig - displayName *string } type outboundCall struct { @@ -104,15 +102,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi tr := TransportFrom(sipConf.transport) contact := c.ContactURI(tr) - if sipConf.host == "" { - sipConf.host = contact.GetHost() - } - fromURI := URI{ - User: sipConf.from, - Host: sipConf.host, - Addr: contact.Addr, - Transport: tr, - } + now := time.Now() call := &outboundCall{ c: c, @@ -126,13 +116,13 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi projectID: projectID, } call.stats.Update() - call.cc = c.newOutbound(log, id, fromURI, contact, sipConf.displayName, call.setAttrsToHeaders) + call.cc = c.newOutbound(log, id, sipConf.uri, sipConf.to, sipConf.from, contact, call.setAttrsToHeaders) call.log = call.log.WithValues("jitterBuf", call.jitterBuf, "sipCallID", call.cc.callID) if sipConf.featureFlags[outboundRouteHeadersFeatureFlag] == "true" { call.cc.routeHeaders = conf.OutboundRouteHeaders } - call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address) + call.mon = c.mon.NewCall(stats.Outbound, sipConf.from.Address.Host, sipConf.to.Address.Host) var err error call.media, err = NewMediaPort(tid, call.log, call.mon, &MediaOptions{ @@ -600,10 +590,8 @@ func (c *outboundCall) sipSignal(ctx context.Context, tid traceid.ID) error { c.mon.InviteReq() c.sigTs.InviteTime = time.Now() - toUri := CreateURIFromUserAndAddress(c.sipConf.to, c.sipConf.address, TransportFrom(c.sipConf.transport)) - ringing := false - sdpResp, err := c.cc.Invite(ctx, toUri, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOfferData, func(code sip.StatusCode, hdrs Headers) { + sdpResp, err := c.cc.Invite(ctx, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOfferData, func(code sip.StatusCode, hdrs Headers) { if code == sip.StatusOK { return // is set separately } @@ -742,27 +730,19 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, head return nil } -func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI, displayName *string, getHeaders setHeadersFunc) *sipOutbound { - from = from.Normalize() - if displayName == nil { // Nothing specified, preserve legacy behavior - displayName = &from.User - } - - fromHeader := &sip.FromHeader{ - DisplayName: *displayName, - Address: *from.GetURI(), - Params: sip.NewParams(), - } +func (c *Client) newOutbound(log logger.Logger, id LocalTag, uri *sip.Uri, to *sip.ToHeader, from *sip.FromHeader, contact URI, getHeaders setHeadersFunc) *sipOutbound { contactHeader := &sip.ContactHeader{ Address: *contact.GetContactURI(), } - fromHeader.Params.Add("tag", string(id)) + from.Params.Add("tag", string(id)) return &sipOutbound{ log: log, c: c, id: id, callID: guid.HashedID(string(id)), - from: fromHeader, + uri: uri, + to: to, + from: from, contact: contactHeader, referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request nextCSeq: 1, @@ -774,7 +754,9 @@ type sipOutbound struct { log logger.Logger c *Client id LocalTag + uri *sip.Uri from *sip.FromHeader + to *sip.ToHeader contact *sip.ContactHeader routeHeaders []string @@ -784,7 +766,6 @@ type sipOutbound struct { invite *sip.Request inviteOk *sip.Response localSDP []byte // SDP Offer, constrained by the answer - to *sip.ToHeader nextCSeq uint32 getHeaders setHeadersFunc @@ -884,12 +865,11 @@ func (c *sipOutbound) RemoteHeaders() Headers { return c.inviteOk.Headers() } -func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, headers map[string]string, sdpOffer []byte, setState sipRespFunc) ([]byte, error) { +func (c *sipOutbound) Invite(ctx context.Context, user, pass string, headers map[string]string, sdpOffer []byte, setState sipRespFunc) ([]byte, error) { ctx, span := Tracer.Start(ctx, "sip.outbound.Invite") defer span.End() c.mu.Lock() defer c.mu.Unlock() - toHeader := &sip.ToHeader{Address: *to.GetURI()} var ( sipHeaders Headers @@ -910,7 +890,7 @@ authLoop: if try >= 5 { return nil, psrpc.NewError(psrpc.FailedPrecondition, ErrAuthMaxRetry) } - req, resp, err = c.attemptInvite(ctx, sip.CallIDHeader(c.callID), toHeader, sdpOffer, authHeaderRespName, authHeader, sipHeaders, setState) + req, resp, err = c.attemptInvite(ctx, sip.CallIDHeader(c.callID), sdpOffer, authHeaderRespName, authHeader, sipHeaders, setState) if err != nil { return nil, err } @@ -978,7 +958,7 @@ authLoop: } c.invite, c.inviteOk = req, resp - toHeader = resp.To() + toHeader := resp.To() if toHeader == nil { return nil, psrpc.NewErrorf(psrpc.Internal, "no To header in INVITE response") } @@ -1024,16 +1004,16 @@ func (c *sipOutbound) AckInviteOK(ctx context.Context) error { return c.c.sipCli.WriteRequest(sip.NewAckRequest(c.invite, c.inviteOk, nil)) } -func (c *sipOutbound) attemptInvite(ctx context.Context, callID sip.CallIDHeader, to *sip.ToHeader, offer []byte, authHeaderName, authHeader string, headers Headers, setState sipRespFunc) (*sip.Request, *sip.Response, error) { +func (c *sipOutbound) attemptInvite(ctx context.Context, callID sip.CallIDHeader, offer []byte, authHeaderName, authHeader string, headers Headers, setState sipRespFunc) (*sip.Request, *sip.Response, error) { ctx, span := Tracer.Start(ctx, "sip.outbound.attemptInvite") defer span.End() - req := sip.NewRequest(sip.INVITE, to.Address) + req := sip.NewRequest(sip.INVITE, *c.uri) c.setCSeq(req) req.RemoveHeader("Call-ID") req.AppendHeader(&callID) req.SetBody(offer) - req.AppendHeader(to) + req.AppendHeader(c.to) req.AppendHeader(c.from) req.AppendHeader(c.contact)