Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ Composer shortcuts: `composer test`, `composer analyse`, `composer cs`, `compose
Console agent that runs end to end: Config, async HTTP with cause-aware retry, Claude &
DeepSeek backends, the tool set (`bash`, `read_file`, `write_file`, `list_files`, `date`,
`php_eval`, `schedule`), the session loop, a permission gatekeeper (confirm + persisted
"always" rules), and per-conversation SQLite persistence (history survives restarts). Next:
a Telegram channel (chat-id allowlist) and an audit log of tool calls.
"always" rules), per-conversation SQLite persistence (history survives restarts), and a
Telegram channel (chat-id allowlist, long-poll, "typing…" indicator). Next: an audit log of
tool calls.
58 changes: 37 additions & 21 deletions bin/claw
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@

declare(strict_types=1);

use function Async\spawn;

use Claw\Agent\AgentInterface;
use Claw\Agent\ClaudeAgent;
use Claw\Agent\OpenAiCompatibleAgent;
use Claw\Chat\ConsoleChat;
use Claw\Chat\ConversationInterface;
use Claw\Chat\TelegramChat;
use Claw\Chat\TelegramClient;
use Claw\Config;
use Claw\Exceptions\ClawException;
use Claw\Http\CurlHttpClient;
Expand Down Expand Up @@ -55,46 +60,57 @@ if ($agent === null) {
}

$workspace = new Workspace($workspaceDir);
$tools = new Registry();
$tools->add(new BashTool($workspaceDir));
$tools->add(new ReadFileTool($workspace));
$tools->add(new WriteFileTool($workspace));
$tools->add(new ListFilesTool($workspace));
$tools->add(new DateTool());
$tools->add(new PhpEvalTool());

$persona = __DIR__ . '/../CLAUDE.md';
$system = is_file($persona) ? (string) file_get_contents($persona) : DEFAULT_SYSTEM;

$chat = match ($config->channel) {
'console' => new ConsoleChat(),
default => null, // telegram not wired yet
'telegram' => new TelegramChat(
new TelegramClient($http, $config->telegramToken),
$config->isChatAllowed(...), // authorization: drop anyone not on the allowlist
),
default => null,
};
if ($chat === null) {
fwrite(STDERR, "Channel '{$config->channel}' is not wired yet.\n");
exit(1);
}

// One SQLite file per conversation, so history survives restarts. The console is
// a single conversation; the Telegram gateway will open one file per chat_id.
// One SQLite file per conversation (keyed by its id), so history survives restarts.
$sessionsDir = $workspaceDir . '/sessions';
if (!is_dir($sessionsDir)) {
mkdir($sessionsDir, 0o775, true);
}
try {
$store = new SessionStore($sessionsDir . '/console.db');
} catch (ClawException $e) {
fwrite(STDERR, 'Store error: ' . $e->getMessage() . "\n");
exit(1);
}

$conversation = $chat->accept();
// Build the per-conversation tool set + store and run its session. Tools are
// per-conversation because `schedule` delivers reminders to that exact chat.
$runSession = static function (ConversationInterface $conversation) use ($agent, $workspace, $workspaceDir, $system, $config, $sessionsDir): void {
$tools = new Registry();
$tools->add(new BashTool($workspaceDir));
$tools->add(new ReadFileTool($workspace));
$tools->add(new WriteFileTool($workspace));
$tools->add(new ListFilesTool($workspace));
$tools->add(new DateTool());
$tools->add(new PhpEvalTool());
$tools->add(new ScheduleTool($conversation->send(...)));

$store = new SessionStore($sessionsDir . '/' . $conversation->id() . '.db');

new Session($conversation, $agent, $tools, $system, $config->model, $config->maxHistory, store: $store)->run();
};

// The schedule tool delivers reminders straight to the user, so it's wired with
// this conversation's send() as its sink.
$tools->add(new ScheduleTool($conversation->send(...)));
if ($chat instanceof TelegramChat) {
// Many chats: long-poll in the background, then one Session per authorized chat.
spawn($chat->poll(...));
for (;;) {
$conversation = $chat->accept();
spawn(static fn () => $runSession($conversation));
}
}

new Session($conversation, $agent, $tools, $system, $config->model, $config->maxHistory, store: $store)->run();
// Console: a single conversation, run inline.
$runSession($chat->accept());

function makeAgent(Config $config, HttpClientInterface $http): ?AgentInterface
{
Expand Down
5 changes: 5 additions & 0 deletions src/Chat/AsyncConsoleConversation.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public function __construct()
$this->reader = spawn($this->readLoop(...));
}

public function id(): string
{
return 'console';
}

public function receive(): ?string
{
// The background reader (readLoop) does the actual input; here we just
Expand Down
5 changes: 5 additions & 0 deletions src/Chat/ConsoleConversation.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public function __construct($input, $output)
$this->output = $output;
}

public function id(): string
{
return 'console';
}

public function receive(): ?string
{
// Push any visible status (e.g. token count) into scroll history before
Expand Down
3 changes: 3 additions & 0 deletions src/Chat/ConversationInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
*/
interface ConversationInterface
{
/** A stable identifier for this conversation, used to key its persistence (the store file). */
public function id(): string;

/** Next message from this chat, or null when the conversation is closed. May await. */
public function receive(): ?string;

Expand Down
107 changes: 107 additions & 0 deletions src/Chat/TelegramChat.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
<?php

declare(strict_types=1);

namespace Claw\Chat;

use function Async\delay;

/**
* Telegram gateway. A single long-poll loop pulls updates and routes each one:
* authorize the sender against the allowlist, demultiplex by chat_id, and queue
* the text on that chat's Conversation. A new authorized chat surfaces from
* accept(); the main loop spawns one Session per Conversation.
*
* Authorization is the must-have: a bot is public, so an unauthorized sender is
* silently dropped (no reply — a reply would confirm the bot to a prober). DMs
* only for now; group updates are ignored.
*/
final class TelegramChat implements ChatInterface
{
/** @var array<int, TelegramConversation> chat_id => conversation */
private array $conversations = [];

/** @var list<TelegramConversation> authorized chats awaiting accept() */
private array $pending = [];

private int $offset = 0;

/**
* @param \Closure(int): bool $isAllowed authorizes a sender by user id
*/
public function __construct(
private readonly TelegramClient $client,
private readonly \Closure $isAllowed,
) {
}

/** Long-poll Telegram forever, routing each update. Spawn this once. */
public function poll(): void
{
for (;;) {
try {
foreach ($this->client->getUpdates($this->offset) as $update) {
$this->offset = max($this->offset, (int) ($update['update_id'] ?? 0) + 1);
$this->ingest($update);
}
} catch (\Throwable $e) {
// Network blip or API hiccup: back off so we don't hot-loop.
fwrite(STDERR, 'telegram: poll error: ' . $e->getMessage() . "\n");
delay(2000);
}
}
}

public function accept(): ConversationInterface
{
while ($this->pending === []) {
delay(50);
}

return array_shift($this->pending);
}

/**
* Route one raw Telegram update: authorize, demultiplex by chat, queue the
* text. Unauthorized senders, non-DM chats and non-text messages are dropped.
* Driven by poll(); public so the routing can be tested without the network.
*
* @param array<string, mixed> $update
*/
public function ingest(array $update): void
{
$message = $update['message'] ?? null;
if (!\is_array($message)) {
return;
}

$chat = \is_array($message['chat'] ?? null) ? $message['chat'] : [];
if (($chat['type'] ?? null) !== 'private') {
return; // DMs only for now
}

$from = \is_array($message['from'] ?? null) ? $message['from'] : [];
$userId = (int) ($from['id'] ?? 0);
if (!($this->isAllowed)($userId)) {
// Silent drop. Log the id so the owner can find it for the allowlist.
fwrite(STDERR, "telegram: dropped message from unauthorized id {$userId}\n");

return;
}

$text = $message['text'] ?? null;
if (!\is_string($text) || trim($text) === '') {
return;
}

$chatId = (int) ($chat['id'] ?? $userId);
$conversation = $this->conversations[$chatId] ?? null;
if ($conversation === null) {
$conversation = new TelegramConversation($chatId, $this->client);
$this->conversations[$chatId] = $conversation;
$this->pending[] = $conversation; // accept() hands it to a new Session
}

$conversation->deliver(trim($text));
}
}
82 changes: 82 additions & 0 deletions src/Chat/TelegramClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

declare(strict_types=1);

namespace Claw\Chat;

use Claw\Exceptions\HttpException;
use Claw\Http\HttpClientInterface;

/**
* Thin wrapper over the Telegram Bot API (plain HTTPS): long-poll for updates and
* send messages. Uses the shared async HTTP client, so getUpdates parks the
* coroutine for the long-poll duration without blocking the thread.
*/
final readonly class TelegramClient
{
private string $base;

public function __construct(
private HttpClientInterface $http,
string $token,
) {
$this->base = 'https://api.telegram.org/bot' . $token . '/';
}

/**
* Long-poll for message updates after $offset.
*
* @return list<array<string, mixed>> the raw updates (may be empty)
*
* @throws HttpException on transport failure or a Telegram API error
*/
public function getUpdates(int $offset, int $timeoutSeconds = 25): array
{
$url = $this->base . 'getUpdates?' . http_build_query([
'offset' => $offset,
'timeout' => $timeoutSeconds,
'allowed_updates' => '["message"]',
]);

$data = $this->http->get($url)->json();
if (($data['ok'] ?? false) !== true) {
throw new HttpException('Telegram getUpdates failed: ' . json_encode($data['description'] ?? $data));
}

$result = $data['result'] ?? [];
if (!\is_array($result)) {
return [];
}

$updates = [];
foreach ($result as $item) {
if (\is_array($item)) {
/** @var array<string, mixed> $item */
$updates[] = $item;
}
}

return $updates;
}

public function sendMessage(int $chatId, string $text): void
{
$this->call('sendMessage', ['chat_id' => $chatId, 'text' => $text]);
}

/** Show a transient chat action (e.g. "typing…"); Telegram clears it after ~5s. */
public function sendChatAction(int $chatId, string $action): void
{
$this->call('sendChatAction', ['chat_id' => $chatId, 'action' => $action]);
}

/**
* @param array<string, mixed> $payload
*/
private function call(string $method, array $payload): void
{
$body = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_THROW_ON_ERROR);

$this->http->post($this->base . $method, $body, ['Content-Type: application/json']);
}
}
Loading
Loading