Skip to content

Commit f2977b5

Browse files
committed
added QueryResultPacket
1 parent 54315e4 commit f2977b5

15 files changed

Lines changed: 701 additions & 26 deletions

src/SuperSocket.MySQL/MySQLConnection.cs

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
24
using System.Net;
35
using System.Net.Sockets;
46
using System.Security.Cryptography;
@@ -141,30 +143,83 @@ private byte[] GenerateAuthResponse(HandshakePacket handshakePacket)
141143
}
142144

143145
/// <summary>
144-
/// Executes a simple query (placeholder implementation)
146+
/// Executes a SQL query and returns the result
145147
/// </summary>
146148
/// <param name="query">The SQL query to execute</param>
147149
/// <param name="cancellationToken">Cancellation token</param>
148-
/// <returns>Task representing the async operation</returns>
149-
public async Task<string> ExecuteQueryAsync(string query, CancellationToken cancellationToken = default)
150+
/// <returns>QueryResult containing the query results</returns>
151+
public async Task<QueryResultPacket> ExecuteQueryAsync(string query, CancellationToken cancellationToken = default)
150152
{
151153
if (!IsAuthenticated)
152154
throw new InvalidOperationException("Connection is not authenticated. Call ConnectAsync first.");
153155

154156
if (string.IsNullOrEmpty(query))
155157
throw new ArgumentException("Query cannot be null or empty.", nameof(query));
156158

157-
// This is a placeholder implementation
158-
// In a complete implementation, you would:
159-
// 1. Create a COM_QUERY packet with the SQL query
160-
// 2. Send the packet to the server
161-
// 3. Receive and parse the result set
162-
// 4. Return the results
159+
try
160+
{
161+
// Create and send COM_QUERY command packet
162+
var commandPacket = new CommandPacket(MySQLCommand.COM_QUERY, query)
163+
{
164+
SequenceId = 0
165+
};
166+
167+
await SendAsync(PacketEncoder, commandPacket).ConfigureAwait(false);
163168

164-
await Task.Delay(10, cancellationToken); // Simulate async operation
165-
return "Query execution not fully implemented yet";
169+
// Read response
170+
var response = await ReceiveAsync().ConfigureAwait(false);
171+
172+
return (QueryResultPacket)response;
173+
}
174+
catch (Exception ex)
175+
{
176+
return QueryResultPacket.FromError(-1, ex.Message);
177+
}
166178
}
167179

180+
/// <summary>
181+
/// Executes a SQL query and returns a simple string representation
182+
/// </summary>
183+
/// <param name="query">The SQL query to execute</param>
184+
/// <param name="cancellationToken">Cancellation token</param>
185+
/// <returns>String representation of the results</returns>
186+
public async Task<string> ExecuteQueryStringAsync(string query, CancellationToken cancellationToken = default)
187+
{
188+
var result = await ExecuteQueryAsync(query, cancellationToken).ConfigureAwait(false);
189+
190+
if (!result.IsSuccess)
191+
{
192+
return $"Error {result.ErrorCode}: {result.ErrorMessage}";
193+
}
194+
195+
if (result.Columns == null || result.Columns.Count == 0)
196+
{
197+
return $"Query executed successfully. {result.AffectedRows} rows affected.";
198+
}
199+
200+
var sb = new StringBuilder();
201+
202+
// Add column headers
203+
sb.AppendLine(string.Join("\t", result.Columns));
204+
205+
// Add separator line
206+
sb.AppendLine(new string('-', result.Columns.Count * 10));
207+
208+
// Add data rows
209+
if (result.Rows != null)
210+
{
211+
foreach (var row in result.Rows)
212+
{
213+
sb.AppendLine(string.Join("\t", row ?? new string[result.Columns.Count]));
214+
}
215+
}
216+
217+
sb.AppendLine($"\n{result.Rows?.Count ?? 0} rows returned.");
218+
219+
return sb.ToString();
220+
}
221+
222+
168223
/// <summary>
169224
/// Disconnects from the MySQL server and resets authentication state
170225
/// </summary>

src/SuperSocket.MySQL/MySQLPacketDecoder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public MySQLPacket Decode(ref ReadOnlySequence<byte> buffer, object context)
4545

4646
var package = _packetFactory.Create(packetType);
4747

48-
package.Decode(ref reader, context);
48+
package = package.Decode(ref reader, context);
4949
package.SequenceId = sequenceId;
5050

5151
if (!filter.ReceivedHandshake)

src/SuperSocket.MySQL/MySQLPacketFactory.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ internal class MySQLPacketFactory : IMySQLPacketFactory
1010
{
1111
public static MySQLPacketFactory ClientInstance { get; }
1212

13+
private static readonly MySQLPacket unknownPacket = new UnknownPacket();
14+
1315
static MySQLPacketFactory()
1416
{
1517
ClientInstance = new MySQLPacketFactory()
@@ -37,7 +39,8 @@ public MySQLPacket Create(int packageType)
3739
{
3840
if (!_packetCreators.TryGetValue(packageType, out var creator))
3941
{
40-
throw new InvalidDataException($"No packet registered for package type {packageType}");
42+
//throw new InvalidDataException($"No packet registered for package type {packageType}");
43+
return unknownPacket;
4144
}
4245

4346
var packet = creator();
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Text;
4+
5+
namespace SuperSocket.MySQL.Packets
6+
{
7+
/// <summary>
8+
/// Represents a MySQL column definition packet
9+
/// Contains metadata about a column in the result set
10+
/// </summary>
11+
public class ColumnDefinitionPacket : MySQLPacket
12+
{
13+
public string Catalog { get; set; }
14+
public string Schema { get; set; }
15+
public string Table { get; set; }
16+
public string OrgTable { get; set; }
17+
public string Name { get; set; }
18+
public string OrgName { get; set; }
19+
public ushort NextLength { get; set; }
20+
public uint CharacterSet { get; set; }
21+
public uint ColumnLength { get; set; }
22+
public byte ColumnType { get; set; }
23+
public ushort Flags { get; set; }
24+
public byte Decimals { get; set; }
25+
26+
public ColumnDefinitionPacket()
27+
{
28+
}
29+
30+
protected internal override MySQLPacket Decode(ref SequenceReader<byte> reader, object context)
31+
{
32+
// Read catalog (length-encoded string)
33+
if (!reader.TryReadLengthEncodedString(out string catalog))
34+
throw new InvalidOperationException("Failed to read catalog");
35+
Catalog = catalog;
36+
37+
// Read schema (length-encoded string)
38+
if (!reader.TryReadLengthEncodedString(out string schema))
39+
throw new InvalidOperationException("Failed to read schema");
40+
Schema = schema;
41+
42+
// Read table (length-encoded string)
43+
if (!reader.TryReadLengthEncodedString(out string table))
44+
throw new InvalidOperationException("Failed to read table");
45+
Table = table;
46+
47+
// Read org_table (length-encoded string)
48+
if (!reader.TryReadLengthEncodedString(out string orgTable))
49+
throw new InvalidOperationException("Failed to read org_table");
50+
OrgTable = orgTable;
51+
52+
// Read name (length-encoded string)
53+
if (!reader.TryReadLengthEncodedString(out string name))
54+
throw new InvalidOperationException("Failed to read name");
55+
Name = name;
56+
57+
// Read org_name (length-encoded string)
58+
if (!reader.TryReadLengthEncodedString(out string orgName))
59+
throw new InvalidOperationException("Failed to read org_name");
60+
OrgName = orgName;
61+
62+
// Read next_length (length-encoded integer)
63+
NextLength = (ushort)reader.ReadLengthEncodedInteger();
64+
65+
// Read character_set (2 bytes)
66+
if (!reader.TryReadLittleEndian(out short characterSetShort))
67+
throw new InvalidOperationException("Failed to read character_set");
68+
CharacterSet = (uint)characterSetShort;
69+
70+
// Read column_length (4 bytes)
71+
if (!reader.TryReadLittleEndian(out int columnLengthInt))
72+
throw new InvalidOperationException("Failed to read column_length");
73+
ColumnLength = (uint)columnLengthInt;
74+
75+
// Read column_type (1 byte)
76+
if (!reader.TryRead(out byte columnType))
77+
throw new InvalidOperationException("Failed to read column_type");
78+
ColumnType = columnType;
79+
80+
// Read flags (2 bytes)
81+
if (!reader.TryReadLittleEndian(out short flagsShort))
82+
throw new InvalidOperationException("Failed to read flags");
83+
Flags = (ushort)flagsShort;
84+
85+
// Read decimals (1 byte)
86+
if (!reader.TryRead(out byte decimals))
87+
throw new InvalidOperationException("Failed to read decimals");
88+
Decimals = decimals;
89+
90+
// Skip the two null bytes that follow
91+
reader.Advance(2);
92+
93+
return this;
94+
}
95+
96+
protected internal override int Encode(IBufferWriter<byte> writer)
97+
{
98+
var bytesWritten = 0;
99+
100+
// Write catalog
101+
var catalogBytes = Encoding.UTF8.GetBytes(Catalog ?? "def");
102+
bytesWritten += writer.WriteLengthEncodedInteger((ulong)catalogBytes.Length);
103+
writer.Write(catalogBytes);
104+
bytesWritten += catalogBytes.Length;
105+
106+
// Write schema
107+
var schemaBytes = Encoding.UTF8.GetBytes(Schema ?? "");
108+
bytesWritten += writer.WriteLengthEncodedInteger((ulong)schemaBytes.Length);
109+
writer.Write(schemaBytes);
110+
bytesWritten += schemaBytes.Length;
111+
112+
// Write table
113+
var tableBytes = Encoding.UTF8.GetBytes(Table ?? "");
114+
bytesWritten += writer.WriteLengthEncodedInteger((ulong)tableBytes.Length);
115+
writer.Write(tableBytes);
116+
bytesWritten += tableBytes.Length;
117+
118+
// Write org_table
119+
var orgTableBytes = Encoding.UTF8.GetBytes(OrgTable ?? "");
120+
bytesWritten += writer.WriteLengthEncodedInteger((ulong)orgTableBytes.Length);
121+
writer.Write(orgTableBytes);
122+
bytesWritten += orgTableBytes.Length;
123+
124+
// Write name
125+
var nameBytes = Encoding.UTF8.GetBytes(Name ?? "");
126+
bytesWritten += writer.WriteLengthEncodedInteger((ulong)nameBytes.Length);
127+
writer.Write(nameBytes);
128+
bytesWritten += nameBytes.Length;
129+
130+
// Write org_name
131+
var orgNameBytes = Encoding.UTF8.GetBytes(OrgName ?? "");
132+
bytesWritten += writer.WriteLengthEncodedInteger((ulong)orgNameBytes.Length);
133+
writer.Write(orgNameBytes);
134+
bytesWritten += orgNameBytes.Length;
135+
136+
// Write next_length
137+
bytesWritten += writer.WriteLengthEncodedInteger(NextLength);
138+
139+
// Write character_set
140+
bytesWritten += writer.WriteUInt16((ushort)CharacterSet);
141+
142+
// Write column_length
143+
bytesWritten += writer.WriteUInt32(ColumnLength);
144+
145+
// Write column_type
146+
bytesWritten += writer.WriteUInt8(ColumnType);
147+
148+
// Write flags
149+
bytesWritten += writer.WriteUInt16(Flags);
150+
151+
// Write decimals
152+
bytesWritten += writer.WriteUInt8(Decimals);
153+
154+
// Write two null bytes
155+
bytesWritten += writer.WriteUInt16(0);
156+
157+
return bytesWritten;
158+
}
159+
}
160+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Text;
4+
5+
namespace SuperSocket.MySQL.Packets
6+
{
7+
/// <summary>
8+
/// Represents a MySQL command packet (COM_QUERY, COM_PING, etc.)
9+
/// </summary>
10+
internal class CommandPacket : MySQLPacket
11+
{
12+
public MySQLCommand Command { get; set; }
13+
14+
public string QueryText { get; set; }
15+
16+
public CommandPacket()
17+
{
18+
}
19+
20+
public CommandPacket(MySQLCommand command, string queryText = null)
21+
{
22+
Command = command;
23+
QueryText = queryText;
24+
}
25+
26+
protected internal override MySQLPacket Decode(ref SequenceReader<byte> reader, object context)
27+
{
28+
// Read command byte
29+
if (reader.TryRead(out byte commandByte))
30+
{
31+
Command = (MySQLCommand)commandByte;
32+
}
33+
34+
// Read query text if present
35+
if (reader.Remaining > 0)
36+
{
37+
var queryBytes = new byte[reader.Remaining];
38+
reader.TryCopyTo(queryBytes);
39+
reader.Advance(queryBytes.Length);
40+
QueryText = Encoding.UTF8.GetString(queryBytes);
41+
}
42+
43+
return this;
44+
}
45+
46+
protected internal override int Encode(IBufferWriter<byte> writer)
47+
{
48+
var bytesWritten = 0;
49+
50+
// Write command byte
51+
bytesWritten += writer.WriteUInt8((byte)Command);
52+
53+
// Write query text if present
54+
if (!string.IsNullOrEmpty(QueryText))
55+
{
56+
var queryBytes = Encoding.UTF8.GetBytes(QueryText);
57+
writer.Write(queryBytes);
58+
bytesWritten += queryBytes.Length;
59+
}
60+
61+
return bytesWritten;
62+
}
63+
}
64+
}

src/SuperSocket.MySQL/Packets/EOFPacket.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class EOFPacket : MySQLPacket, IPacketWithHeaderByte
1212

1313
public int Length { get; private set; }
1414

15-
protected internal override void Decode(ref SequenceReader<byte> reader, object context)
15+
protected internal override MySQLPacket Decode(ref SequenceReader<byte> reader, object context)
1616
{
1717
Length = (int)reader.Remaining;
1818

@@ -25,6 +25,8 @@ protected internal override void Decode(ref SequenceReader<byte> reader, object
2525
throw new InvalidOperationException("Failed to read StatusFlags from EOFPacket.");
2626

2727
StatusFlags = statusFlags;
28+
29+
return this;
2830
}
2931

3032
protected internal override int Encode(IBufferWriter<byte> writer)

0 commit comments

Comments
 (0)