Skip to content

Commit 3b497a9

Browse files
committed
feat: writer for mz_parquet v0.2 format
1 parent 95c7e22 commit 3b497a9

1 file changed

Lines changed: 130 additions & 195 deletions

File tree

Writer/ParquetSpectrumWriter.cs

Lines changed: 130 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -1,257 +1,192 @@
11
using System;
22
using System.Collections.Generic;
33
using System.IO;
4-
using System.Linq;
54
using System.Reflection;
65
using log4net;
7-
using Parquet;
8-
using Parquet.Data;
6+
using Parquet.Serialization;
97
using ThermoFisher.CommonCore.Data;
108
using ThermoFisher.CommonCore.Data.Business;
119
using ThermoFisher.CommonCore.Data.FilterEnums;
1210
using ThermoFisher.CommonCore.Data.Interfaces;
13-
using ThermoRawFileParser.Writer.MzML;
1411

1512
namespace ThermoRawFileParser.Writer
1613
{
14+
15+
struct MzParquet
16+
{
17+
public uint scan;
18+
public uint level;
19+
public float rt;
20+
public float mz;
21+
public float intensity;
22+
public float? ion_mobility;
23+
public float? isolation_lower;
24+
public float? isolation_upper;
25+
public uint? precursor_scan;
26+
public float? precursor_mz;
27+
public uint? precursor_charge;
28+
}
29+
1730
public class ParquetSpectrumWriter : SpectrumWriter
1831
{
1932
private static readonly ILog Log =
2033
LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
2134

22-
private IRawDataPlus _rawFile;
23-
24-
// Dictionary to keep track of the different mass analyzers (key: Thermo MassAnalyzerType; value: the reference string)
25-
private readonly Dictionary<MassAnalyzerType, string> _massAnalyzers =
26-
new Dictionary<MassAnalyzerType, string>();
27-
28-
private readonly Dictionary<IonizationModeType, CVParamType> _ionizationTypes =
29-
new Dictionary<IonizationModeType, CVParamType>();
30-
3135
public ParquetSpectrumWriter(ParseInput parseInput) : base(parseInput)
3236
{
3337
}
3438

35-
public override void Write(IRawDataPlus rawFile, int firstScanNumber, int lastScanNumber)
39+
public override void Write(IRawDataPlus raw, int firstScanNumber, int lastScanNumber)
3640
{
37-
_rawFile = rawFile;
38-
if (rawFile.HasMsData)
39-
{
40-
List<PScan> pScans = new List<PScan>();
41-
WritePScans(ParseInput.OutputDirectory, rawFile.FileName, rawFile, pScans);
42-
}
43-
else
41+
if (!raw.HasMsData)
4442
{
4543
throw new RawFileParserException("No MS data in RAW file, no output will be produced");
4644
}
47-
}
4845

49-
private static void WritePScans(string outputDirectory, string fileName,
50-
IRawDataPlus raw,
51-
List<PScan> scans)
52-
{
5346
var enumerator = raw.GetFilteredScanEnumerator(" ");
5447

55-
foreach (var scanNumber in enumerator
56-
) // note in my tests serial is faster than Parallel.Foreach() (this involves disk access, so it makes sense)
57-
{
58-
//trailer information is extracted via index
59-
var trailers = raw.GetTrailerExtraValues(scanNumber);
60-
var trailerLabels = raw.GetTrailerExtraInformation(scanNumber);
61-
object chargeState = 0;
62-
for (int i = 0; i < trailerLabels.Labels.Length; i++)
63-
{
64-
if (trailerLabels.Labels[i] == "Charge State:")
65-
{
66-
chargeState = raw.GetTrailerExtraValue(scanNumber, i);
67-
break;
68-
}
69-
}
48+
// NB: replace with more robust strategy?
49+
var output = ParseInput.OutputDirectory + "//" + Path.GetFileNameWithoutExtension(ParseInput.RawFilePath) + ".mzparquet";
50+
51+
ParquetSerializerOptions opts = new ParquetSerializerOptions();
52+
opts.CompressionLevel = System.IO.Compression.CompressionLevel.Fastest;
53+
opts.CompressionMethod = Parquet.CompressionMethod.Zstd;
54+
55+
var data = new List<MzParquet>();
56+
57+
// map last (msOrder - 1) -> scan number (e.g. mapping precursors)
58+
// note, this assumes time dependence of MS1 -> MS2 -> MSN
59+
var last_scans = new Dictionary<int, uint>();
7060

61+
62+
foreach (var scanNumber in enumerator)
63+
{
7164
var scanFilter = raw.GetFilterForScanNumber(scanNumber);
72-
var scanStats = raw.GetScanStatsForScanNumber(scanNumber);
7365

7466
CentroidStream centroidStream = new CentroidStream();
7567

76-
//check for FT mass analyzer data
68+
// Pull out m/z and intensity values
69+
// NB: is this the best way to do this?
7770
if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerFTMS)
7871
{
7972
centroidStream = raw.GetCentroidStream(scanNumber, false);
8073
}
81-
82-
//check for IT mass analyzer data
83-
if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS)
74+
else if (scanFilter.MassAnalyzer == MassAnalyzerType.MassAnalyzerITMS)
8475
{
8576
var scanData = raw.GetSimplifiedScan(scanNumber);
8677
centroidStream.Masses = scanData.Masses;
8778
centroidStream.Intensities = scanData.Intensities;
8879
}
89-
90-
var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder;
91-
92-
if (msOrder == MSOrderType.Ms)
80+
else
9381
{
94-
var pscan = GetPScan(scanStats, centroidStream, fileName, Convert.ToInt32(chargeState));
95-
scans.Add(pscan);
82+
var scanData = raw.GetSimplifiedCentroids(scanNumber);
83+
centroidStream.Masses = scanData.Masses;
84+
centroidStream.Intensities = scanData.Intensities;
9685
}
9786

98-
if (msOrder == MSOrderType.Ms2)
99-
{
100-
var precursorMz = raw.GetScanEventForScanNumber(scanNumber).GetReaction(0).PrecursorMass;
101-
var pscan = GetPScan(scanStats, centroidStream, fileName, precursorMz,
102-
Convert.ToInt32(chargeState));
103-
scans.Add(pscan);
104-
}
10587

106-
var t = raw.GetTrailerExtraValues(scanNumber);
107-
}
88+
var msOrder = raw.GetScanEventForScanNumber(scanNumber).MSOrder;
10889

109-
WriteScans(outputDirectory, scans, fileName);
110-
}
90+
last_scans[(int)msOrder] = (uint)scanNumber;
11191

112-
private static PScan GetPScan(ScanStatistics scanStats, CentroidStream centroidStream,
113-
string fileName, double? precursorMz = null, int? precursorCharge = null)
114-
{
115-
var scan = new PScan
116-
{
117-
FileName = fileName,
118-
BasePeakMass = scanStats.BasePeakMass,
119-
ScanType = scanStats.ScanType,
120-
BasePeakIntensity = scanStats.BasePeakIntensity,
121-
PacketType = scanStats.PacketType,
122-
ScanNumber = scanStats.ScanNumber,
123-
RetentionTime = scanStats.StartTime,
124-
Masses = centroidStream.Masses.ToArray(),
125-
Intensities = centroidStream.Intensities.ToArray(),
126-
LowMass = scanStats.LowMass,
127-
HighMass = scanStats.HighMass,
128-
TIC = scanStats.TIC,
129-
FileId = fileName,
130-
PrecursorMz = precursorMz,
131-
PrecursorCharge = precursorCharge,
132-
MsOrder = 1
133-
};
134-
return scan;
135-
}
92+
double rt = raw.RetentionTimeFromScanNumber(scanNumber);
93+
float? isolation_lower = null;
94+
float? isolation_upper = null;
95+
uint? precursor_scan = null;
96+
float? precursor_mz = null;
97+
uint? precursor_charge = null;
13698

137-
public static void WriteScans(string outputDirectory, List<PScan> scans, string sourceRawFileName)
138-
{
139-
throw new NotImplementedException();
99+
if ((int)msOrder > 1)
100+
{
101+
var rx = scanFilter.GetReaction(0);
140102

141-
//Needs refactoring since Parquet.NET API changed
142-
143-
//var output = outputDirectory + "//" + Path.GetFileNameWithoutExtension(sourceRawFileName);
103+
// this assumes symmetrical quad window
104+
isolation_lower = (float)(rx.PrecursorMass - rx.IsolationWidth / 2);
105+
isolation_upper = (float)(rx.PrecursorMass + rx.IsolationWidth / 2);
106+
precursor_mz = (float)rx.PrecursorMass;
144107

145-
//var ds = new DataSet(new DataField<double>("BasePeakIntensity"),
146-
// new DataField<double>("BasePeakMass"),
147-
// new DataField<double[]>("Baselines"),
148-
// new DataField<double[]>("Charges"),
149-
// new DataField<string>("FileId"),
150-
// new DataField<string>("FileName"),
151-
// new DataField<double>("HighMass"),
152-
// new DataField<double[]>("Intensities"),
153-
// new DataField<double>("LowMass"),
154-
// new DataField<double[]>("Masses"),
155-
// new DataField<int>("MsOrder"),
156-
// new DataField<double[]>("Noises"),
157-
// new DataField<int>("PacketType"),
158-
// new DataField<int?>("PrecursorCharge"),
159-
// new DataField<double?>("PrecursorMass"),
160-
// new DataField<double[]>("Resolutions"),
161-
// new DataField<double>("RetentionTime"),
162-
// new DataField<int>("ScanNumber"),
163-
// new DataField<string>("ScanType"),
164-
// new DataField<double>("TIC"));
108+
// Try to retrieve last scan that occurred at the previous msOrder
109+
uint t;
110+
if (last_scans.TryGetValue((int)msOrder - 1, out t))
111+
{
112+
precursor_scan = t;
113+
}
114+
}
165115

166-
//foreach (var scan in scans)
167-
//{
168-
// //we can't store null values?
169-
// double[] dummyVal = new double[1];
170-
// if (scan.Noises == null)
171-
// {
172-
// scan.Noises = dummyVal;
173-
// }
116+
var trailer = raw.GetTrailerExtraInformation(scanNumber);
117+
for (var i = 0l; i < trailer.Length; i++)
118+
{
174119

175-
// if (scan.Charges == null)
176-
// {
177-
// scan.Charges = dummyVal;
178-
// }
120+
if (trailer.Labels[i].StartsWith("Monoisotopic M/Z"))
121+
{
122+
var val = float.Parse(trailer.Values[i]);
123+
if (val > 0)
124+
{
125+
precursor_mz = val;
126+
}
127+
}
179128

180-
// if (scan.Baselines == null)
181-
// {
182-
// scan.Baselines = dummyVal;
183-
// }
129+
// Overwrite precursor_scan with value from trailer, if it exists
130+
if (trailer.Labels[i].StartsWith("Master Scan"))
131+
{
132+
var val = Int64.Parse(trailer.Values[i]);
133+
if (val > 0)
134+
{
135+
precursor_scan = (uint)val;
136+
}
137+
}
184138

185-
// if (scan.Resolutions == null)
186-
// {
187-
// scan.Resolutions = dummyVal;
188-
// }
139+
if (trailer.Labels[i].StartsWith("Charge"))
140+
{
141+
var val = uint.Parse(trailer.Values[i]);
142+
if (val > 0)
143+
{
144+
precursor_charge = val;
145+
}
146+
}
147+
}
189148

190-
// if (scan.PrecursorMz == null)
191-
// {
192-
// scan.PrecursorMz = 0;
193-
// scan.PrecursorCharge = 0;
194-
// }
149+
// Add a row to parquet file for every m/z value in this scan
150+
for (int i = 0; i < centroidStream.Masses.Length; i++)
151+
{
152+
MzParquet m;
153+
m.rt = (float)rt;
154+
m.scan = (uint)scanNumber;
155+
m.level = ((uint)msOrder);
156+
m.intensity = (float)centroidStream.Intensities[i];
157+
m.mz = (float)centroidStream.Masses[i];
158+
m.isolation_lower = isolation_lower;
159+
m.isolation_upper = isolation_upper;
160+
m.precursor_scan = precursor_scan;
161+
m.precursor_mz = precursor_mz;
162+
m.precursor_charge = precursor_charge;
163+
m.ion_mobility = null;
164+
data.Add(m);
165+
}
166+
167+
// If we have enough ions to write a row group, do so
168+
// - some row groups might have more than this number of ions
169+
// but this ensures that all ions from a single scan are always
170+
// present in the same row group (critical property of mzparquet)
171+
if (data.Count >= 1_048_576)
172+
{
173+
var task = ParquetSerializer.SerializeAsync(data, output, opts);
174+
task.Wait();
175+
opts.Append = true;
176+
data.Clear();
177+
Log.Info("writing row group");
178+
}
195179

196-
// ds.Add(scan.BasePeakIntensity,
197-
// scan.BasePeakMass,
198-
// scan.Baselines,
199-
// scan.Charges,
200-
// scan.FileId,
201-
// scan.FileName,
202-
// scan.HighMass,
203-
// scan.Intensities,
204-
// scan.LowMass,
205-
// scan.Masses,
206-
// scan.MsOrder,
207-
// scan.Noises,
208-
// scan.PacketType,
209-
// scan.PrecursorCharge,
210-
// scan.PrecursorMz,
211-
// scan.Resolutions,
212-
// scan.RetentionTime,
213-
// scan.ScanNumber,
214-
// scan.ScanType,
215-
// scan.TIC);
216-
//}
180+
}
217181

218-
//using (Stream fileStream = File.OpenWrite(output + ".parquet"))
219-
//{
220-
// using (var writer = new ParquetWriter(fileStream))
221-
// {
222-
// writer.Write(ds);
223-
// }
224-
//}
182+
// serialize any remaining ions into the final row group
183+
if (data.Count > 0)
184+
{
185+
var task = ParquetSerializer.SerializeAsync(data, output, opts);
186+
task.Wait();
187+
Log.Info("writing row group");
188+
}
225189
}
226190
}
227191

228-
/// PSCan meaing Parsec Scan (because Commoncore has a Scan class)
229-
/// </summary>
230-
public class PScan
231-
{
232-
/// <summary>
233-
/// Unique ID per file (foreign key in data store)
234-
/// </summary>
235-
public string FileId { get; set; }
236-
237-
public string FileName { get; set; }
238-
public double BasePeakIntensity { get; set; }
239-
public double BasePeakMass { get; set; }
240-
public double[] Baselines { get; set; }
241-
public double[] Charges { get; set; }
242-
public double HighMass { get; set; }
243-
public double[] Intensities { get; set; }
244-
public double LowMass { get; set; }
245-
public double[] Masses { get; set; }
246-
public double[] Noises { get; set; }
247-
public int PacketType { get; set; } // : 20,
248-
public double RetentionTime { get; set; }
249-
public double[] Resolutions { get; set; }
250-
public int ScanNumber { get; set; }
251-
public string ScanType { get; set; } // : FTMS + c ESI d Full ms2 335.9267@hcd30.00 [130.0000-346.0000],
252-
public double TIC { get; set; }
253-
public int MsOrder { get; set; }
254-
public double? PrecursorMz { get; set; }
255-
public int? PrecursorCharge { get; set; }
256-
}
257192
}

0 commit comments

Comments
 (0)