♻️ Refactored the think message part

This commit is contained in:
2025-11-15 16:21:26 +08:00
parent b5f9faa724
commit 80ea44f2cc
8 changed files with 197 additions and 127 deletions

View File

@@ -69,11 +69,6 @@ namespace DysonNetwork.Insight.Migrations
.HasColumnType("uuid") .HasColumnType("uuid")
.HasColumnName("id"); .HasColumnName("id");
b.Property<List<SnThinkingChunk>>("Chunks")
.IsRequired()
.HasColumnType("jsonb")
.HasColumnName("chunks");
b.Property<string>("Content") b.Property<string>("Content")
.HasColumnType("text") .HasColumnType("text")
.HasColumnName("content"); .HasColumnName("content");

View File

@@ -12,21 +12,13 @@ namespace DysonNetwork.Insight.Migrations
/// <inheritdoc /> /// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder) protected override void Up(MigrationBuilder migrationBuilder)
{ {
migrationBuilder.AddColumn<List<SnThinkingChunk>>( // The chunk type has been removed, so this did nothing
name: "chunks",
table: "thinking_thoughts",
type: "jsonb",
nullable: false,
defaultValue: new List<SnThinkingChunk>()
);
} }
/// <inheritdoc /> /// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder) protected override void Down(MigrationBuilder migrationBuilder)
{ {
migrationBuilder.DropColumn( // The chunk type has been removed, so this did nothing
name: "chunks",
table: "thinking_thoughts");
} }
} }
} }

View File

@@ -77,11 +77,6 @@ namespace DysonNetwork.Insight.Migrations
.HasColumnType("uuid") .HasColumnType("uuid")
.HasColumnName("id"); .HasColumnName("id");
b.Property<List<SnThinkingChunk>>("Chunks")
.IsRequired()
.HasColumnType("jsonb")
.HasColumnName("chunks");
b.Property<string>("Content") b.Property<string>("Content")
.HasColumnType("text") .HasColumnType("text")
.HasColumnName("content"); .HasColumnName("content");

View File

@@ -74,14 +74,14 @@ namespace DysonNetwork.Insight.Migrations
.HasColumnType("uuid") .HasColumnType("uuid")
.HasColumnName("id"); .HasColumnName("id");
b.Property<List<SnThinkingChunk>>("Chunks") // b.Property<List<SnThinkingChunk>>("Chunks")
.IsRequired() // .IsRequired()
.HasColumnType("jsonb") // .HasColumnType("jsonb")
.HasColumnName("chunks"); // .HasColumnName("chunks");
b.Property<string>("Content") // b.Property<string>("Content")
.HasColumnType("text") // .HasColumnType("text")
.HasColumnName("content"); // .HasColumnName("content");
b.Property<Instant>("CreatedAt") b.Property<Instant>("CreatedAt")
.HasColumnType("timestamp with time zone") .HasColumnType("timestamp with time zone")

View File

@@ -1,7 +1,5 @@
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using System.Diagnostics.CodeAnalysis; using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
using DysonNetwork.Shared.Models; using DysonNetwork.Shared.Models;
@@ -9,7 +7,6 @@ using DysonNetwork.Shared.Proto;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.SemanticKernel; using Microsoft.SemanticKernel;
using Microsoft.SemanticKernel.ChatCompletion; using Microsoft.SemanticKernel.ChatCompletion;
using Microsoft.SemanticKernel.Connectors.Ollama;
namespace DysonNetwork.Insight.Thought; namespace DysonNetwork.Insight.Thought;
@@ -61,7 +58,14 @@ public class ThoughtController(ThoughtProvider provider, ThoughtService service)
if (sequence == null) return Forbid(); // or NotFound if (sequence == null) return Forbid(); // or NotFound
// Save user thought // Save user thought
await service.SaveThoughtAsync(sequence, request.UserMessage, ThinkingThoughtRole.User); await service.SaveThoughtAsync(sequence, new List<SnThinkingMessagePart>
{
new()
{
Type = ThinkingMessagePartType.Text,
Text = request.UserMessage
}
}, ThinkingThoughtRole.User);
// Build chat history // Build chat history
var chatHistory = new ChatHistory( var chatHistory = new ChatHistory(
@@ -111,16 +115,48 @@ public class ThoughtController(ThoughtProvider provider, ThoughtService service)
for (var i = 1; i < count; i++) // skip first (the newest, current user) for (var i = 1; i < count; i++) // skip first (the newest, current user)
{ {
var thought = previousThoughts[i]; var thought = previousThoughts[i];
switch (thought.Role) var textContent = new StringBuilder();
var functionCalls = new List<FunctionCallContent>();
var hasFunctionCalls = false;
foreach (var part in thought.Parts)
{ {
case ThinkingThoughtRole.User: switch (part.Type)
chatHistory.AddUserMessage(thought.Content ?? ""); {
break; case ThinkingMessagePartType.Text:
case ThinkingThoughtRole.Assistant: textContent.Append(part.Text);
chatHistory.AddAssistantMessage(thought.Content ?? ""); break;
break; case ThinkingMessagePartType.FunctionCall:
default: hasFunctionCalls = true;
throw new ArgumentOutOfRangeException(); functionCalls.Add(new FunctionCallContent(part.FunctionCall!.Name, part.FunctionCall.Arguments,
part.FunctionCall.Id));
break;
case ThinkingMessagePartType.FunctionResult:
var resultObject = part.FunctionResult!.Result;
var resultString = resultObject is string s ? s : JsonSerializer.Serialize(resultObject);
var result = new FunctionResultContent(part.FunctionResult!.CallId, resultString);
chatHistory.Add(result.ToChatMessage());
break;
}
}
if (thought.Role == ThinkingThoughtRole.User)
{
chatHistory.AddUserMessage(textContent.ToString());
}
else
{
var assistantMessage = new ChatMessageContent(AuthorRole.Assistant, textContent.ToString());
if (hasFunctionCalls)
{
assistantMessage.Items = new ChatMessageContentItemCollection();
foreach (var fc in functionCalls)
{
assistantMessage.Items.Add(fc);
}
}
chatHistory.Add(assistantMessage);
} }
} }
@@ -132,72 +168,120 @@ public class ThoughtController(ThoughtProvider provider, ThoughtService service)
var kernel = provider.Kernel; var kernel = provider.Kernel;
var chatCompletionService = kernel.GetRequiredService<IChatCompletionService>(); var chatCompletionService = kernel.GetRequiredService<IChatCompletionService>();
var executionSettings = provider.CreatePromptExecutionSettings();
// Kick off streaming generation var assistantParts = new List<SnThinkingMessagePart>();
var accumulatedContent = new StringBuilder();
var thinkingChunks = new List<SnThinkingChunk>(); while (true)
await foreach (var chunk in chatCompletionService.GetStreamingChatMessageContentsAsync(
chatHistory,
provider.CreatePromptExecutionSettings(),
kernel: kernel
))
{ {
// Process each item in the chunk for detailed streaming var textContentBuilder = new StringBuilder();
foreach (var item in chunk.Items) AuthorRole? authorRole = null;
var functionCallBuilder = new FunctionCallContentBuilder();
await foreach (var streamingContent in chatCompletionService.GetStreamingChatMessageContentsAsync(
chatHistory, executionSettings, kernel))
{ {
var streamingChunk = item switch authorRole ??= streamingContent.Role;
if (streamingContent.Content is not null)
{ {
StreamingTextContent textContent => new SnThinkingChunk textContentBuilder.Append(streamingContent.Content);
{ Type = StreamingContentType.Text, Data = new() { ["text"] = textContent.Text ?? "" } }, var messageJson = JsonSerializer.Serialize(new
StreamingReasoningContent reasoningContent => new SnThinkingChunk { type = "text", data = streamingContent.Content });
{ await Response.Body.WriteAsync(Encoding.UTF8.GetBytes($"data: {messageJson}\n\n"));
Type = StreamingContentType.Reasoning, Data = new() { ["text"] = reasoningContent.Text } await Response.Body.FlushAsync();
}, }
StreamingFunctionCallUpdateContent functionCall => string.IsNullOrEmpty(functionCall.CallId)
? null
: new SnThinkingChunk
{
Type = StreamingContentType.FunctionCall,
Data = JsonSerializer.Deserialize<Dictionary<string, object>>(
JsonSerializer.Serialize(functionCall)) ?? new Dictionary<string, object>()
},
_ => new SnThinkingChunk
{
Type = StreamingContentType.Unknown, Data = new() { ["data"] = JsonSerializer.Serialize(item) }
}
};
if (streamingChunk == null) continue;
thinkingChunks.Add(streamingChunk); if (streamingContent.Items.Count > 0)
var messageJson = item switch
{ {
StreamingTextContent textContent => functionCallBuilder.Append(streamingContent);
JsonSerializer.Serialize(new { type = "text", data = textContent.Text ?? "" }), }
StreamingReasoningContent reasoningContent =>
JsonSerializer.Serialize(new { type = "reasoning", data = reasoningContent.Text }),
StreamingFunctionCallUpdateContent functionCall =>
JsonSerializer.Serialize(new { type = "function_call", data = functionCall }),
_ =>
JsonSerializer.Serialize(new { type = "unknown", data = item })
};
// Write a structured JSON message to the HTTP response as SSE foreach (var functionCallUpdate in streamingContent.Items.OfType<StreamingFunctionCallUpdateContent>())
var messageBytes = Encoding.UTF8.GetBytes($"data: {messageJson}\n\n"); {
await Response.Body.WriteAsync(messageBytes); var messageJson = JsonSerializer.Serialize(new
await Response.Body.FlushAsync(); { type = "function_call_update", data = functionCallUpdate });
await Response.Body.WriteAsync(Encoding.UTF8.GetBytes($"data: {messageJson}\n\n"));
await Response.Body.FlushAsync();
}
} }
// Accumulate content for saving (only text content) var finalMessageText = textContentBuilder.ToString();
accumulatedContent.Append(chunk.Content ?? ""); if (!string.IsNullOrEmpty(finalMessageText))
{
assistantParts.Add(new SnThinkingMessagePart
{ Type = ThinkingMessagePartType.Text, Text = finalMessageText });
}
var functionCalls = functionCallBuilder.Build();
if (functionCalls.Count == 0)
{
break;
}
var assistantMessage = new ChatMessageContent(authorRole ?? AuthorRole.Assistant,
string.IsNullOrEmpty(finalMessageText) ? null : finalMessageText);
foreach (var functionCall in functionCalls)
{
assistantMessage.Items.Add(functionCall);
}
chatHistory.Add(assistantMessage);
foreach (var functionCall in functionCalls)
{
var part = new SnThinkingMessagePart
{
Type = ThinkingMessagePartType.FunctionCall,
FunctionCall = new SnFunctionCall
{
Id = functionCall.Id!,
Name = functionCall.FunctionName!,
Arguments = JsonSerializer.Serialize(functionCall.Arguments)
}
};
assistantParts.Add(part);
var messageJson = JsonSerializer.Serialize(new { type = "function_call", data = part.FunctionCall });
await Response.Body.WriteAsync(Encoding.UTF8.GetBytes($"data: {messageJson}\n\n"));
await Response.Body.FlushAsync();
FunctionResultContent resultContent;
try
{
resultContent = await functionCall.InvokeAsync(kernel);
}
catch (Exception ex)
{
resultContent = new FunctionResultContent(functionCall.Id!, ex.Message);
}
chatHistory.Add(resultContent.ToChatMessage());
var resultPart = new SnThinkingMessagePart
{
Type = ThinkingMessagePartType.FunctionResult,
FunctionResult = new SnFunctionResult
{
CallId = resultContent.CallId,
Result = resultContent.Result!,
IsError = resultContent.Result is Exception
}
};
assistantParts.Add(resultPart);
var resultMessageJson =
JsonSerializer.Serialize(new { type = "function_result", data = resultPart.FunctionResult });
await Response.Body.WriteAsync(Encoding.UTF8.GetBytes($"data: {resultMessageJson}\n\n"));
await Response.Body.FlushAsync();
}
} }
// Save assistant thought // Save assistant thought
var savedThought = await service.SaveThoughtAsync( var savedThought = await service.SaveThoughtAsync(
sequence, sequence,
accumulatedContent.ToString(), assistantParts,
ThinkingThoughtRole.Assistant, ThinkingThoughtRole.Assistant,
thinkingChunks,
provider.ModelDefault provider.ModelDefault
); );
@@ -209,7 +293,6 @@ public class ThoughtController(ThoughtProvider provider, ThoughtService service)
{ {
var topicJson = JsonSerializer.Serialize(new { type = "topic", data = sequence.Topic ?? "" }); var topicJson = JsonSerializer.Serialize(new { type = "topic", data = sequence.Topic ?? "" });
await streamBuilder.WriteAsync(Encoding.UTF8.GetBytes($"topic: {topicJson}\n\n")); await streamBuilder.WriteAsync(Encoding.UTF8.GetBytes($"topic: {topicJson}\n\n"));
savedThought.Sequence.Topic = topic;
} }
var thoughtJson = JsonSerializer.Serialize(new { type = "thought", data = savedThought }, var thoughtJson = JsonSerializer.Serialize(new { type = "thought", data = savedThought },

View File

@@ -109,22 +109,12 @@ public class ThoughtProvider
case "ollama": case "ollama":
return new OllamaPromptExecutionSettings return new OllamaPromptExecutionSettings
{ {
FunctionChoiceBehavior = FunctionChoiceBehavior.Auto( FunctionChoiceBehavior = FunctionChoiceBehavior.Auto()
options: new FunctionChoiceBehaviorOptions
{
AllowParallelCalls = true,
AllowConcurrentInvocation = true
})
}; };
case "deepseek": case "deepseek":
return new OpenAIPromptExecutionSettings return new OpenAIPromptExecutionSettings
{ {
FunctionChoiceBehavior = FunctionChoiceBehavior.Auto( FunctionChoiceBehavior = FunctionChoiceBehavior.Auto()
options: new FunctionChoiceBehaviorOptions
{
AllowParallelCalls = true,
AllowConcurrentInvocation = true
})
}; };
default: default:
throw new InvalidOperationException("Unknown provider: " + ModelProviderType); throw new InvalidOperationException("Unknown provider: " + ModelProviderType);

View File

@@ -11,8 +11,7 @@ namespace DysonNetwork.Insight.Thought;
public class ThoughtService( public class ThoughtService(
AppDatabase db, AppDatabase db,
ICacheService cache, ICacheService cache,
PaymentService.PaymentServiceClient paymentService, PaymentService.PaymentServiceClient paymentService
WalletService.WalletServiceClient walletService
) )
{ {
public async Task<SnThinkingSequence?> GetOrCreateSequenceAsync( public async Task<SnThinkingSequence?> GetOrCreateSequenceAsync(
@@ -39,38 +38,39 @@ public class ThoughtService(
public async Task<SnThinkingThought> SaveThoughtAsync( public async Task<SnThinkingThought> SaveThoughtAsync(
SnThinkingSequence sequence, SnThinkingSequence sequence,
string content, List<SnThinkingMessagePart> parts,
ThinkingThoughtRole role, ThinkingThoughtRole role,
List<SnThinkingChunk>? chunks = null,
string? model = null string? model = null
) )
{ {
// Approximate token count (1 token ≈ 4 characters for GPT-like models) // Approximate token count (1 token ≈ 4 characters for GPT-like models)
var tokenCount = content?.Length / 4 ?? 0; var totalChars = parts.Sum(part =>
(part.Type == ThinkingMessagePartType.Text ? part.Text?.Length : 0) ?? 0 +
(part.Type == ThinkingMessagePartType.FunctionCall ? part.FunctionCall?.Arguments.Length : 0) ?? 0
);
var tokenCount = totalChars / 4;
var thought = new SnThinkingThought var thought = new SnThinkingThought
{ {
SequenceId = sequence.Id, SequenceId = sequence.Id,
Content = content, Parts = parts,
Role = role, Role = role,
TokenCount = tokenCount, TokenCount = tokenCount,
ModelName = model, ModelName = model,
Chunks = chunks ?? new List<SnThinkingChunk>(),
}; };
db.ThinkingThoughts.Add(thought); db.ThinkingThoughts.Add(thought);
// Update sequence total tokens only for assistant responses // Update sequence total tokens only for assistant responses
if (role == ThinkingThoughtRole.Assistant) if (role == ThinkingThoughtRole.Assistant)
sequence.TotalToken += tokenCount; sequence.TotalToken += tokenCount;
await db.SaveChangesAsync(); await db.SaveChangesAsync();
// Invalidate cache for this sequence's thoughts // Invalidate cache for this sequence's thoughts
await cache.RemoveGroupAsync($"sequence:{sequence.Id}"); await cache.RemoveGroupAsync($"sequence:{sequence.Id}");
return thought; return thought;
} }
public async Task<List<SnThinkingThought>> GetPreviousThoughtsAsync(SnThinkingSequence sequence) public async Task<List<SnThinkingThought>> GetPreviousThoughtsAsync(SnThinkingSequence sequence)
{ {
var cacheKey = $"thoughts:{sequence.Id}"; var cacheKey = $"thoughts:{sequence.Id}";

View File

@@ -8,7 +8,7 @@ public class SnThinkingSequence : ModelBase
{ {
public Guid Id { get; set; } = Guid.NewGuid(); public Guid Id { get; set; } = Guid.NewGuid();
[MaxLength(4096)] public string? Topic { get; set; } [MaxLength(4096)] public string? Topic { get; set; }
public long TotalToken { get; set; } public long TotalToken { get; set; }
public long PaidToken { get; set; } public long PaidToken { get; set; }
@@ -21,33 +21,48 @@ public enum ThinkingThoughtRole
User User
} }
public enum StreamingContentType public enum ThinkingMessagePartType
{ {
Text, Text,
Reasoning,
FunctionCall, FunctionCall,
Unknown FunctionResult
} }
public class SnThinkingChunk public class SnThinkingMessagePart
{ {
public StreamingContentType Type { get; set; } public ThinkingMessagePartType Type { get; set; }
public Dictionary<string, object>? Data { get; set; } = new(); public string? Text { get; set; }
public SnFunctionCall? FunctionCall { get; set; }
public SnFunctionResult? FunctionResult { get; set; }
}
public class SnFunctionCall
{
public string Id { get; set; } = null!;
public string Name { get; set; } = null!;
public string Arguments { get; set; } = null!;
}
public class SnFunctionResult
{
public string CallId { get; set; } = null!;
public object Result { get; set; } = null!;
public bool IsError { get; set; }
} }
public class SnThinkingThought : ModelBase public class SnThinkingThought : ModelBase
{ {
public Guid Id { get; set; } = Guid.NewGuid(); public Guid Id { get; set; } = Guid.NewGuid();
public string? Content { get; set; }
[Column(TypeName = "jsonb")] public List<SnCloudFileReferenceObject> Files { get; set; } = []; [Column(TypeName = "jsonb")] public List<SnCloudFileReferenceObject> Files { get; set; } = [];
[Column(TypeName = "jsonb")] public List<SnThinkingChunk> Chunks { get; set; } = [];
[Column(TypeName = "jsonb")] public List<SnThinkingMessagePart> Parts { get; set; } = [];
public ThinkingThoughtRole Role { get; set; } public ThinkingThoughtRole Role { get; set; }
public long TokenCount { get; set; } public long TokenCount { get; set; }
[MaxLength(4096)] public string? ModelName { get; set; } [MaxLength(4096)] public string? ModelName { get; set; }
public Guid SequenceId { get; set; } public Guid SequenceId { get; set; }
[JsonIgnore] public SnThinkingSequence Sequence { get; set; } = null!; [JsonIgnore] public SnThinkingSequence Sequence { get; set; } = null!;
} }