From 3ff3f45e30d9c29f99932565af2f537f0608a5d4 Mon Sep 17 00:00:00 2001 From: Rafael Marinho Date: Mon, 22 Dec 2025 11:25:19 +0100 Subject: [PATCH 01/14] [CHA-1610] Add support channels batch update --- lib/stream-chat/channel_batch_updater.rb | 211 +++++++++++++++++++++++ lib/stream-chat/client.rb | 15 ++ 2 files changed, 226 insertions(+) create mode 100644 lib/stream-chat/channel_batch_updater.rb diff --git a/lib/stream-chat/channel_batch_updater.rb b/lib/stream-chat/channel_batch_updater.rb new file mode 100644 index 0000000..c29472b --- /dev/null +++ b/lib/stream-chat/channel_batch_updater.rb @@ -0,0 +1,211 @@ +# typed: strict +# frozen_string_literal: true + +require 'stream-chat/client' +require 'stream-chat/stream_response' +require 'stream-chat/types' + +module StreamChat + class ChannelBatchUpdater + extend T::Sig + + sig { params(client: StreamChat::Client).void } + def initialize(client) + @client = client + end + + # Member operations + + # addMembers - Add members to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T.any(T::Array[String], T::Array[StringKeyHash])] Members to add + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T.any(T::Array[String], T::Array[StringKeyHash])).returns(StreamChat::StreamResponse) } + def add_members(filter, members) + @client.update_channels_batch( + { + operation: 'addMembers', + filter: filter, + members: members + } + ) + end + + # removeMembers - Remove members from channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[String]] Member IDs to remove + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + def remove_members(filter, members) + @client.update_channels_batch( + { + operation: 'removeMembers', + filter: filter, + members: members + } + ) + end + + # inviteMembers - Invite members to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T.any(T::Array[String], T::Array[StringKeyHash])] Members to invite + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T.any(T::Array[String], T::Array[StringKeyHash])).returns(StreamChat::StreamResponse) } + def invite_members(filter, members) + @client.update_channels_batch( + { + operation: 'invites', + filter: filter, + members: members + } + ) + end + + # addModerators - Add moderators to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[String]] Member IDs to promote to moderator + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + def add_moderators(filter, members) + @client.update_channels_batch( + { + operation: 'addModerators', + filter: filter, + members: members + } + ) + end + + # demoteModerators - Remove moderator role from members in channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[String]] Member IDs to demote + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + def demote_moderators(filter, members) + @client.update_channels_batch( + { + operation: 'demoteModerators', + filter: filter, + members: members + } + ) + end + + # assignRoles - Assign roles to members in channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[StringKeyHash]] Members with role assignments + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } + def assign_roles(filter, members) + @client.update_channels_batch( + { + operation: 'assignRoles', + filter: filter, + members: members + } + ) + end + + # Visibility operations + + # hide - Hide channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } + def hide(filter) + @client.update_channels_batch( + { + operation: 'hide', + filter: filter + } + ) + end + + # show - Show channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } + def show(filter) + @client.update_channels_batch( + { + operation: 'show', + filter: filter + } + ) + end + + # archive - Archive channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } + def archive(filter) + @client.update_channels_batch( + { + operation: 'archive', + filter: filter + } + ) + end + + # unarchive - Unarchive channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } + def unarchive(filter) + @client.update_channels_batch( + { + operation: 'unarchive', + filter: filter + } + ) + end + + # Data operations + + # updateData - Update data on channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param data [StringKeyHash] Data to update + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, data: StringKeyHash).returns(StreamChat::StreamResponse) } + def update_data(filter, data) + @client.update_channels_batch( + { + operation: 'updateData', + filter: filter, + data: data + } + ) + end + + # addFilterTags - Add filter tags to channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param tags [T::Array[String]] Tags to add + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, tags: T::Array[String]).returns(StreamChat::StreamResponse) } + def add_filter_tags(filter, tags) + @client.update_channels_batch( + { + operation: 'addFilterTags', + filter: filter, + filter_tags_update: tags + } + ) + end + + # removeFilterTags - Remove filter tags from channels matching the filter + # @param filter [StringKeyHash] Filter to select channels + # @param tags [T::Array[String]] Tags to remove + # @return [StreamChat::StreamResponse] The server response + sig { params(filter: StringKeyHash, tags: T::Array[String]).returns(StreamChat::StreamResponse) } + def remove_filter_tags(filter, tags) + @client.update_channels_batch( + { + operation: 'removeFilterTags', + filter: filter, + filter_tags_update: tags + } + ) + end + end +end + diff --git a/lib/stream-chat/client.rb b/lib/stream-chat/client.rb index 1f46789..4ec19db 100644 --- a/lib/stream-chat/client.rb +++ b/lib/stream-chat/client.rb @@ -1074,6 +1074,21 @@ def mark_delivered(data = nil, user_id: nil) post('channels/delivered', data: data || {}, params: { user_id: user_id }) end + # Update channels in batch. + # @param payload [StringKeyHash] Payload containing operation, filter, and optional members/data/filter_tags_update + # @return [StreamChat::StreamResponse] API response + sig { params(payload: StringKeyHash).returns(StreamChat::StreamResponse) } + def update_channels_batch(payload) + put('channels/batch', data: payload) + end + + # Returns a ChannelBatchUpdater instance for batch channel operations. + # @return [StreamChat::ChannelBatchUpdater] A ChannelBatchUpdater instance + sig { returns(StreamChat::ChannelBatchUpdater) } + def channel_batch_updater + ChannelBatchUpdater.new(self) + end + private sig { returns(T::Hash[String, String]) } From 97c77b38a66c911baebbf9eef196552d961ee19b Mon Sep 17 00:00:00 2001 From: javierdfm Date: Mon, 19 Jan 2026 15:00:58 +0100 Subject: [PATCH 02/14] fix: removed trailing blank line --- lib/stream-chat/channel_batch_updater.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/stream-chat/channel_batch_updater.rb b/lib/stream-chat/channel_batch_updater.rb index c29472b..04b26b8 100644 --- a/lib/stream-chat/channel_batch_updater.rb +++ b/lib/stream-chat/channel_batch_updater.rb @@ -208,4 +208,3 @@ def remove_filter_tags(filter, tags) end end end - From e2a6d9bc2487757766af252e41c6239fe944cd5b Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 09:20:48 +0100 Subject: [PATCH 03/14] fix: Add channel_batch_updater to client.rb --- lib/stream-chat/client.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/stream-chat/client.rb b/lib/stream-chat/client.rb index 5385b28..494e38f 100644 --- a/lib/stream-chat/client.rb +++ b/lib/stream-chat/client.rb @@ -17,6 +17,7 @@ require 'stream-chat/util' require 'stream-chat/types' require 'stream-chat/moderation' +require 'stream-chat/channel_batch_updater' module StreamChat DEFAULT_BLOCKLIST = 'profanity_en_2020_v1' From 74ccbe6ef3a40d1a1a07c74f9fbad97550e800db Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 09:57:27 +0100 Subject: [PATCH 04/14] fix: fixed archive and unarchive and added tests --- lib/stream-chat/channel_batch_updater.rb | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/stream-chat/channel_batch_updater.rb b/lib/stream-chat/channel_batch_updater.rb index 04b26b8..654bd91 100644 --- a/lib/stream-chat/channel_batch_updater.rb +++ b/lib/stream-chat/channel_batch_updater.rb @@ -134,28 +134,32 @@ def show(filter) ) end - # archive - Archive channels matching the filter + # archive - Archive channels matching the filter for specified members # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[String]] Member IDs to archive the channels for # @return [StreamChat::StreamResponse] The server response - sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } - def archive(filter) + sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + def archive(filter, members) @client.update_channels_batch( { operation: 'archive', - filter: filter + filter: filter, + members: members } ) end - # unarchive - Unarchive channels matching the filter + # unarchive - Unarchive channels matching the filter for specified members # @param filter [StringKeyHash] Filter to select channels + # @param members [T::Array[String]] Member IDs to unarchive the channels for # @return [StreamChat::StreamResponse] The server response - sig { params(filter: StringKeyHash).returns(StreamChat::StreamResponse) } - def unarchive(filter) + sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + def unarchive(filter, members) @client.update_channels_batch( { operation: 'unarchive', - filter: filter + filter: filter, + members: members } ) end From e20768d1d378d9086eb9a79d33cc0eff19bd446e Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 09:57:37 +0100 Subject: [PATCH 05/14] fix: fixed archive and unarchive and added tests --- spec/channel_batch_updater_spec.rb | 232 +++++++++++++++++++++++++++++ 1 file changed, 232 insertions(+) create mode 100644 spec/channel_batch_updater_spec.rb diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb new file mode 100644 index 0000000..b13c8d2 --- /dev/null +++ b/spec/channel_batch_updater_spec.rb @@ -0,0 +1,232 @@ +# frozen_string_literal: true + +require 'securerandom' +require 'stream-chat' + +describe StreamChat::ChannelBatchUpdater do + def loop_times(times) + loop do + begin + yield() + return + rescue StandardError, RSpec::Expectations::ExpectationNotMetError + raise if times.zero? + end + + sleep(1) + times -= 1 + end + end + + def wait_for_task(task_id, timeout_seconds: 120) + sleep(2) # Initial delay + + timeout_seconds.times do |i| + begin + task = @client.get_task(task_id) + rescue StandardError => e + if i < 10 + sleep(1) + next + end + raise e + end + + expect(task['id']).to eq(task_id) + + case task['status'] + when 'waiting', 'pending', 'running' + sleep(1) + when 'completed' + return task + when 'failed' + if task['result']&.dig('description')&.downcase&.include?('rate limit') + sleep(2) + next + end + raise "Task failed with result: #{task['result']}" + end + end + + raise "Task did not complete within #{timeout_seconds} seconds" + end + + before(:all) do + @client = StreamChat::Client.from_env + @created_users = [] + end + + before(:each) do + @random_users = [{ id: SecureRandom.uuid, name: 'user1' }, { id: SecureRandom.uuid, name: 'user2' }] + @random_user = { id: SecureRandom.uuid } + + users_to_insert = [@random_users[0], @random_users[1], @random_user] + + @created_users.push(*users_to_insert.map { |u| u[:id] }) + @client.upsert_users(users_to_insert) + + @channel1 = @client.channel('messaging', channel_id: SecureRandom.uuid, data: { test: true }) + @channel1.create(@random_user[:id]) + + @channel2 = @client.channel('messaging', channel_id: SecureRandom.uuid, data: { test: true }) + @channel2.create(@random_user[:id]) + end + + after(:each) do + @channel1&.delete + rescue StreamChat::StreamAPIException + # Ignore if channel already deleted + ensure + begin + @channel2&.delete + rescue StreamChat::StreamAPIException + # Ignore if channel already deleted + end + end + + after(:all) do + curr_idx = 0 + batch_size = 25 + + slice = @created_users.slice(0, batch_size) + + while !slice.nil? && !slice.empty? + @client.delete_users(slice, user: StreamChat::HARD_DELETE, messages: StreamChat::HARD_DELETE) + + curr_idx += batch_size + slice = @created_users.slice(curr_idx, batch_size) + end + end + + describe 'Client#update_channels_batch' do + it 'returns error if options is empty' do + expect { @client.update_channels_batch({}) }.to raise_error(StreamChat::StreamAPIException) + end + + it 'batch updates channels with valid options' do + response = @client.update_channels_batch( + { + operation: 'addMembers', + filter: { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + members: [@random_users[0][:id]] + } + ) + + expect(response['task_id']).not_to be_empty + end + end + + describe 'ChannelBatchUpdater#add_members' do + it 'adds members to channels matching filter' do + updater = @client.channel_batch_updater + + members = @random_users.map { |u| u[:id] } + response = updater.add_members( + { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + members + ) + + expect(response['task_id']).not_to be_empty + task_id = response['task_id'] + + wait_for_task(task_id) + + # Verify members were added + loop_times(120) do + ch1_state = @channel1.query + ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + + members.each do |member_id| + expect(ch1_member_ids).to include(member_id) + end + end + end + end + + describe 'ChannelBatchUpdater#remove_members' do + it 'removes members from channels matching filter' do + # First add both users as members to both channels + members_to_add = @random_users.map { |u| u[:id] } + @channel1.add_members(members_to_add) + @channel2.add_members(members_to_add) + + # Verify members were added + loop_times(60) do + ch1_state = @channel1.query + expect(ch1_state['members'].length).to eq(2) + + ch2_state = @channel2.query + expect(ch2_state['members'].length).to eq(2) + end + + # Verify member IDs match + ch1_state = @channel1.query + ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + expect(ch1_member_ids).to match_array(members_to_add) + + ch2_state = @channel2.query + ch2_member_ids = ch2_state['members'].map { |m| m['user_id'] } + expect(ch2_member_ids).to match_array(members_to_add) + + # Now remove one member using batch updater + updater = @client.channel_batch_updater + member_to_remove = members_to_add[0] + + response = updater.remove_members( + { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + [member_to_remove] + ) + + expect(response['task_id']).not_to be_empty + task_id = response['task_id'] + + wait_for_task(task_id) + + # Verify member was removed + loop_times(120) do + ch1_state = @channel1.query + ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + + expect(ch1_member_ids).not_to include(member_to_remove) + end + end + end + + describe 'ChannelBatchUpdater#archive' do + it 'archives channels for specified members' do + # First add both users as members to both channels + members_to_add = @random_users.map { |u| u[:id] } + @channel1.add_members(members_to_add) + @channel2.add_members(members_to_add) + + # Wait for members to be added + loop_times(60) do + ch1_state = @channel1.query + expect(ch1_state['members'].length).to eq(2) + end + + # Archive channels for one member + updater = @client.channel_batch_updater + member_to_archive = members_to_add[0] + + response = updater.archive( + { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + [member_to_archive] + ) + + expect(response['task_id']).not_to be_empty + task_id = response['task_id'] + + wait_for_task(task_id) + + # Verify archived_at is set for the member + loop_times(120) do + ch1_state = @channel1.query + member = ch1_state['members'].find { |m| m['user_id'] == member_to_archive } + + expect(member).not_to be_nil + expect(member['archived_at']).not_to be_nil + end + end + end +end From 2116bdb2eccc6f74a94c285eeebba6f9d3316893 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 10:02:10 +0100 Subject: [PATCH 06/14] fix: fixed tests --- lib/stream-chat/channel_batch_updater.rb | 12 +++--- spec/channel_batch_updater_spec.rb | 49 +++++++++++++++--------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/lib/stream-chat/channel_batch_updater.rb b/lib/stream-chat/channel_batch_updater.rb index 654bd91..bfecf04 100644 --- a/lib/stream-chat/channel_batch_updater.rb +++ b/lib/stream-chat/channel_batch_updater.rb @@ -33,9 +33,9 @@ def add_members(filter, members) # removeMembers - Remove members from channels matching the filter # @param filter [StringKeyHash] Filter to select channels - # @param members [T::Array[String]] Member IDs to remove + # @param members [T::Array[StringKeyHash]] Members to remove (each with user_id key) # @return [StreamChat::StreamResponse] The server response - sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } def remove_members(filter, members) @client.update_channels_batch( { @@ -136,9 +136,9 @@ def show(filter) # archive - Archive channels matching the filter for specified members # @param filter [StringKeyHash] Filter to select channels - # @param members [T::Array[String]] Member IDs to archive the channels for + # @param members [T::Array[StringKeyHash]] Members to archive channels for (each with user_id key) # @return [StreamChat::StreamResponse] The server response - sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } def archive(filter, members) @client.update_channels_batch( { @@ -151,9 +151,9 @@ def archive(filter, members) # unarchive - Unarchive channels matching the filter for specified members # @param filter [StringKeyHash] Filter to select channels - # @param members [T::Array[String]] Member IDs to unarchive the channels for + # @param members [T::Array[StringKeyHash]] Members to unarchive channels for (each with user_id key) # @return [StreamChat::StreamResponse] The server response - sig { params(filter: StringKeyHash, members: T::Array[String]).returns(StreamChat::StreamResponse) } + sig { params(filter: StringKeyHash, members: T::Array[StringKeyHash]).returns(StreamChat::StreamResponse) } def unarchive(filter, members) @client.update_channels_batch( { diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index b13c8d2..13cad80 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -18,19 +18,31 @@ def loop_times(times) end end + def rate_limit_error?(task) + result = task['result'] + return false unless result.is_a?(Hash) + + description = result['description'] + return false unless description.is_a?(String) + + description.downcase.include?('rate limit') + end + + def fetch_task_with_retry(task_id, attempt) + @client.get_task(task_id) + rescue StandardError => e + raise e if attempt >= 10 + + sleep(1) + nil + end + def wait_for_task(task_id, timeout_seconds: 120) sleep(2) # Initial delay timeout_seconds.times do |i| - begin - task = @client.get_task(task_id) - rescue StandardError => e - if i < 10 - sleep(1) - next - end - raise e - end + task = fetch_task_with_retry(task_id, i) + next if task.nil? expect(task['id']).to eq(task_id) @@ -40,11 +52,9 @@ def wait_for_task(task_id, timeout_seconds: 120) when 'completed' return task when 'failed' - if task['result']&.dig('description')&.downcase&.include?('rate limit') - sleep(2) - next - end - raise "Task failed with result: #{task['result']}" + raise "Task failed with result: #{task['result']}" unless rate_limit_error?(task) + + sleep(2) end end @@ -108,7 +118,7 @@ def wait_for_task(task_id, timeout_seconds: 120) { operation: 'addMembers', filter: { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, - members: [@random_users[0][:id]] + members: [{ 'user_id' => @random_users[0][:id] }] } ) @@ -120,7 +130,8 @@ def wait_for_task(task_id, timeout_seconds: 120) it 'adds members to channels matching filter' do updater = @client.channel_batch_updater - members = @random_users.map { |u| u[:id] } + member_ids = @random_users.map { |u| u[:id] } + members = member_ids.map { |id| { 'user_id' => id } } response = updater.add_members( { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, members @@ -136,7 +147,7 @@ def wait_for_task(task_id, timeout_seconds: 120) ch1_state = @channel1.query ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } - members.each do |member_id| + member_ids.each do |member_id| expect(ch1_member_ids).to include(member_id) end end @@ -174,7 +185,7 @@ def wait_for_task(task_id, timeout_seconds: 120) response = updater.remove_members( { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, - [member_to_remove] + [{ 'user_id' => member_to_remove }] ) expect(response['task_id']).not_to be_empty @@ -211,7 +222,7 @@ def wait_for_task(task_id, timeout_seconds: 120) response = updater.archive( { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, - [member_to_archive] + [{ 'user_id' => member_to_archive }] ) expect(response['task_id']).not_to be_empty From 1b163fd428ea3d369da7a5d08e88981efa3297f4 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 10:09:08 +0100 Subject: [PATCH 07/14] fix: tests --- spec/channel_batch_updater_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index 13cad80..7e321e5 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -44,7 +44,7 @@ def wait_for_task(task_id, timeout_seconds: 120) task = fetch_task_with_retry(task_id, i) next if task.nil? - expect(task['id']).to eq(task_id) + expect(task['task_id']).to eq(task_id) case task['status'] when 'waiting', 'pending', 'running' From b472b551cee7d3ae50516a64f8574fef4d7377ee Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 11:39:26 +0100 Subject: [PATCH 08/14] fix: tests --- spec/channel_batch_updater_spec.rb | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index 7e321e5..9f1c611 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -52,9 +52,15 @@ def wait_for_task(task_id, timeout_seconds: 120) when 'completed' return task when 'failed' - raise "Task failed with result: #{task['result']}" unless rate_limit_error?(task) - - sleep(2) + # If result is empty, continue polling (matches Go behavior) + result = task['result'] + if result.nil? || (result.is_a?(Hash) && result.empty?) + sleep(2) + elsif rate_limit_error?(task) + sleep(2) + else + raise "Task failed with result: #{task['result']}" + end end end From dac08ed40a7fe19e4bb8d7ef5177798e70ef7f79 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 12:50:00 +0100 Subject: [PATCH 09/14] fix: update members --- lib/stream-chat/channel.rb | 11 ++++++++++ spec/channel_batch_updater_spec.rb | 32 +++++++++++++++--------------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/lib/stream-chat/channel.rb b/lib/stream-chat/channel.rb index 6fe2eb6..93a4a86 100644 --- a/lib/stream-chat/channel.rb +++ b/lib/stream-chat/channel.rb @@ -97,6 +97,17 @@ def query(**options) state end + # Refreshes the channel state from the server. + # Updates the channel's members attribute with fresh data. + sig { returns(StreamChat::StreamResponse) } + def refresh_state + url = "channels/#{@channel_type}/#{@id}/query" + state = @client.post(url, data: { state: true }) + + @members = state['members'] if state['members'] + state + end + # Queries members of a channel. # # The queryMembers endpoint allows you to list and paginate members from a channel. The diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index 9f1c611..2e17483 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -150,8 +150,8 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify members were added loop_times(120) do - ch1_state = @channel1.query - ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + @channel1.refresh_state + ch1_member_ids = @channel1.members.map { |m| m['user_id'] } member_ids.each do |member_id| expect(ch1_member_ids).to include(member_id) @@ -169,20 +169,20 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify members were added loop_times(60) do - ch1_state = @channel1.query - expect(ch1_state['members'].length).to eq(2) + @channel1.refresh_state + expect(@channel1.members.length).to eq(2) - ch2_state = @channel2.query - expect(ch2_state['members'].length).to eq(2) + @channel2.refresh_state + expect(@channel2.members.length).to eq(2) end # Verify member IDs match - ch1_state = @channel1.query - ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + @channel1.refresh_state + ch1_member_ids = @channel1.members.map { |m| m['user_id'] } expect(ch1_member_ids).to match_array(members_to_add) - ch2_state = @channel2.query - ch2_member_ids = ch2_state['members'].map { |m| m['user_id'] } + @channel2.refresh_state + ch2_member_ids = @channel2.members.map { |m| m['user_id'] } expect(ch2_member_ids).to match_array(members_to_add) # Now remove one member using batch updater @@ -201,8 +201,8 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify member was removed loop_times(120) do - ch1_state = @channel1.query - ch1_member_ids = ch1_state['members'].map { |m| m['user_id'] } + @channel1.refresh_state + ch1_member_ids = @channel1.members.map { |m| m['user_id'] } expect(ch1_member_ids).not_to include(member_to_remove) end @@ -218,8 +218,8 @@ def wait_for_task(task_id, timeout_seconds: 120) # Wait for members to be added loop_times(60) do - ch1_state = @channel1.query - expect(ch1_state['members'].length).to eq(2) + @channel1.refresh_state + expect(@channel1.members.length).to eq(2) end # Archive channels for one member @@ -238,8 +238,8 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify archived_at is set for the member loop_times(120) do - ch1_state = @channel1.query - member = ch1_state['members'].find { |m| m['user_id'] == member_to_archive } + @channel1.refresh_state + member = @channel1.members.find { |m| m['user_id'] == member_to_archive } expect(member).not_to be_nil expect(member['archived_at']).not_to be_nil From e09167fe3824929bab6b48cf99cab82f06d4e572 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 14:25:48 +0100 Subject: [PATCH 10/14] fix: try members --- lib/stream-chat/channel.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/stream-chat/channel.rb b/lib/stream-chat/channel.rb index 93a4a86..789a126 100644 --- a/lib/stream-chat/channel.rb +++ b/lib/stream-chat/channel.rb @@ -101,9 +101,7 @@ def query(**options) # Updates the channel's members attribute with fresh data. sig { returns(StreamChat::StreamResponse) } def refresh_state - url = "channels/#{@channel_type}/#{@id}/query" - state = @client.post(url, data: { state: true }) - + state = query_members @members = state['members'] if state['members'] state end From ea714d906967d6cbe483c772f45bd1078b60f222 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Wed, 21 Jan 2026 15:30:35 +0100 Subject: [PATCH 11/14] fix: use correct values --- spec/channel_batch_updater_spec.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index 2e17483..2521b2f 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -151,7 +151,7 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify members were added loop_times(120) do @channel1.refresh_state - ch1_member_ids = @channel1.members.map { |m| m['user_id'] } + ch1_member_ids = @channel1.members.map { |m| m['user']['id'] } member_ids.each do |member_id| expect(ch1_member_ids).to include(member_id) @@ -178,11 +178,11 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify member IDs match @channel1.refresh_state - ch1_member_ids = @channel1.members.map { |m| m['user_id'] } + ch1_member_ids = @channel1.members.map { |m| m['user']['id'] } expect(ch1_member_ids).to match_array(members_to_add) @channel2.refresh_state - ch2_member_ids = @channel2.members.map { |m| m['user_id'] } + ch2_member_ids = @channel2.members.map { |m| m['user']['id'] } expect(ch2_member_ids).to match_array(members_to_add) # Now remove one member using batch updater @@ -202,7 +202,7 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify member was removed loop_times(120) do @channel1.refresh_state - ch1_member_ids = @channel1.members.map { |m| m['user_id'] } + ch1_member_ids = @channel1.members.map { |m| m['user']['id'] } expect(ch1_member_ids).not_to include(member_to_remove) end @@ -239,7 +239,7 @@ def wait_for_task(task_id, timeout_seconds: 120) # Verify archived_at is set for the member loop_times(120) do @channel1.refresh_state - member = @channel1.members.find { |m| m['user_id'] == member_to_archive } + member = @channel1.members.find { |m| m['user']['id'] == member_to_archive } expect(member).not_to be_nil expect(member['archived_at']).not_to be_nil From 079e1990ff31db88ecb020cfc77dd5989bb4587f Mon Sep 17 00:00:00 2001 From: javierdfm Date: Thu, 22 Jan 2026 11:47:24 +0100 Subject: [PATCH 12/14] fix: removed loop_times --- spec/channel_batch_updater_spec.rb | 49 ++++++++++++++++++------------ 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index 2521b2f..3b44081 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -4,20 +4,6 @@ require 'stream-chat' describe StreamChat::ChannelBatchUpdater do - def loop_times(times) - loop do - begin - yield() - return - rescue StandardError, RSpec::Expectations::ExpectationNotMetError - raise if times.zero? - end - - sleep(1) - times -= 1 - end - end - def rate_limit_error?(task) result = task['result'] return false unless result.is_a?(Hash) @@ -149,13 +135,18 @@ def wait_for_task(task_id, timeout_seconds: 120) wait_for_task(task_id) # Verify members were added - loop_times(120) do + 120.times do |i| @channel1.refresh_state ch1_member_ids = @channel1.members.map { |m| m['user']['id'] } member_ids.each do |member_id| expect(ch1_member_ids).to include(member_id) end + break + rescue StandardError, RSpec::Expectations::ExpectationNotMetError => e + raise e if i == 119 + + sleep(1) end end end @@ -168,12 +159,17 @@ def wait_for_task(task_id, timeout_seconds: 120) @channel2.add_members(members_to_add) # Verify members were added - loop_times(60) do + 60.times do |i| @channel1.refresh_state expect(@channel1.members.length).to eq(2) @channel2.refresh_state expect(@channel2.members.length).to eq(2) + break + rescue StandardError, RSpec::Expectations::ExpectationNotMetError => e + raise e if i == 59 + + sleep(1) end # Verify member IDs match @@ -200,11 +196,16 @@ def wait_for_task(task_id, timeout_seconds: 120) wait_for_task(task_id) # Verify member was removed - loop_times(120) do + 120.times do |i| @channel1.refresh_state ch1_member_ids = @channel1.members.map { |m| m['user']['id'] } expect(ch1_member_ids).not_to include(member_to_remove) + break + rescue StandardError, RSpec::Expectations::ExpectationNotMetError => e + raise e if i == 119 + + sleep(1) end end end @@ -217,9 +218,14 @@ def wait_for_task(task_id, timeout_seconds: 120) @channel2.add_members(members_to_add) # Wait for members to be added - loop_times(60) do + 60.times do |i| @channel1.refresh_state expect(@channel1.members.length).to eq(2) + break + rescue StandardError, RSpec::Expectations::ExpectationNotMetError => e + raise e if i == 59 + + sleep(1) end # Archive channels for one member @@ -237,12 +243,17 @@ def wait_for_task(task_id, timeout_seconds: 120) wait_for_task(task_id) # Verify archived_at is set for the member - loop_times(120) do + 120.times do |i| @channel1.refresh_state member = @channel1.members.find { |m| m['user']['id'] == member_to_archive } expect(member).not_to be_nil expect(member['archived_at']).not_to be_nil + break + rescue StandardError, RSpec::Expectations::ExpectationNotMetError => e + raise e if i == 119 + + sleep(1) end end end From aac72b1841868191290f97fd67f03aacc8819f2c Mon Sep 17 00:00:00 2001 From: javierdfm Date: Thu, 22 Jan 2026 13:47:42 +0100 Subject: [PATCH 13/14] fix: use correct params --- lib/stream-chat/channel.rb | 11 +++++++++-- spec/channel_batch_updater_spec.rb | 8 ++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/lib/stream-chat/channel.rb b/lib/stream-chat/channel.rb index 789a126..838a569 100644 --- a/lib/stream-chat/channel.rb +++ b/lib/stream-chat/channel.rb @@ -101,8 +101,15 @@ def query(**options) # Updates the channel's members attribute with fresh data. sig { returns(StreamChat::StreamResponse) } def refresh_state - state = query_members - @members = state['members'] if state['members'] + url = "channels/#{@channel_type}/#{@id}/query" + state = @client.post(url, data: { state: true }) + + # Members can be at top level or inside channel object (like Go's updateChannel) + if state['members'] && !state['members'].empty? + @members = state['members'] + elsif state['channel'] && state['channel']['members'] + @members = state['channel']['members'] + end state end diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index 3b44081..d79d23f 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -109,7 +109,7 @@ def wait_for_task(task_id, timeout_seconds: 120) response = @client.update_channels_batch( { operation: 'addMembers', - filter: { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + filter: { cids: { '$in' => [@channel1.cid, @channel2.cid] } }, members: [{ 'user_id' => @random_users[0][:id] }] } ) @@ -125,7 +125,7 @@ def wait_for_task(task_id, timeout_seconds: 120) member_ids = @random_users.map { |u| u[:id] } members = member_ids.map { |id| { 'user_id' => id } } response = updater.add_members( - { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + { cids: { '$in' => [@channel1.cid, @channel2.cid] } }, members ) @@ -186,7 +186,7 @@ def wait_for_task(task_id, timeout_seconds: 120) member_to_remove = members_to_add[0] response = updater.remove_members( - { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + { cids: { '$in' => [@channel1.cid, @channel2.cid] } }, [{ 'user_id' => member_to_remove }] ) @@ -233,7 +233,7 @@ def wait_for_task(task_id, timeout_seconds: 120) member_to_archive = members_to_add[0] response = updater.archive( - { cid: { '$in' => [@channel1.cid, @channel2.cid] } }, + { cids: { '$in' => [@channel1.cid, @channel2.cid] } }, [{ 'user_id' => member_to_archive }] ) From 93baff34de7d3ed2b066310d91d57f6d57449556 Mon Sep 17 00:00:00 2001 From: javierdfm Date: Thu, 22 Jan 2026 13:58:22 +0100 Subject: [PATCH 14/14] fix: rubocop --- spec/channel_batch_updater_spec.rb | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/spec/channel_batch_updater_spec.rb b/spec/channel_batch_updater_spec.rb index d79d23f..95ea379 100644 --- a/spec/channel_batch_updater_spec.rb +++ b/spec/channel_batch_updater_spec.rb @@ -23,6 +23,13 @@ def fetch_task_with_retry(task_id, attempt) nil end + def transient_failure?(task) + result = task['result'] + return true if result.nil? || (result.is_a?(Hash) && result.empty?) + + rate_limit_error?(task) + end + def wait_for_task(task_id, timeout_seconds: 120) sleep(2) # Initial delay @@ -38,15 +45,9 @@ def wait_for_task(task_id, timeout_seconds: 120) when 'completed' return task when 'failed' - # If result is empty, continue polling (matches Go behavior) - result = task['result'] - if result.nil? || (result.is_a?(Hash) && result.empty?) - sleep(2) - elsif rate_limit_error?(task) - sleep(2) - else - raise "Task failed with result: #{task['result']}" - end + raise "Task failed with result: #{task['result']}" unless transient_failure?(task) + + sleep(2) end end