storage api write indy-singh
23 removals
158 lines
25 additions
158 lines
private async Task ExpensiveStreamingInsert()
private async Task StorageWriteApi()
{
{
// max batch insert is 10,000.
// max batch insert is 10,000.
var length = Math.Min(MAX_BATCH_SIZE, _concurrentQueue.Count);
var length = Math.Min(MAX_BATCH_SIZE, _concurrentQueue.Count);
// capacity is not length! This is here to pre-alloc the underlying array, so that it isn't needlessly resized
// capacity is not length! This is here to pre-alloc the underlying array, so that it isn't needlessly resized
var backup = new List<Tuple<WatchtowerBigQueryModel.Fields, WatchtowerBigQueryModel.Counters>>(capacity: length);
var backup = new List<Tuple<WatchtowerBigQueryModel.Fields, WatchtowerBigQueryModel.Counters>>(capacity: length);
var localConcurrentDictionary = new ConcurrentDictionary<WatchtowerBigQueryModel.Fields, WatchtowerBigQueryModel.Counters>();
var localConcurrentDictionary = new ConcurrentDictionary<WatchtowerBigQueryModel.Fields, WatchtowerBigQueryModel.Counters>();
var list = new List<BigQueryInsertRow>(capacity: length);
var list = new List<WatchtowerRecord>(capacity: length);
for (int i = 0; i < length; i++)
for (int i = 0; i < length; i++)
{
{
if (_concurrentQueue.TryDequeue(out var item))
if (_concurrentQueue.TryDequeue(out var item))
{
{
(WatchtowerBigQueryModel.Fields key, WatchtowerBigQueryModel.Counters value) = item;
(WatchtowerBigQueryModel.Fields key, WatchtowerBigQueryModel.Counters value) = item;
WatchtowerBigQueryModel.Counters UpdateValueFactory(WatchtowerBigQueryModel.Fields existingKey, WatchtowerBigQueryModel.Counters existingValue)
WatchtowerBigQueryModel.Counters UpdateValueFactory(WatchtowerBigQueryModel.Fields existingKey, WatchtowerBigQueryModel.Counters existingValue)
{
{
existingValue.TotalDuration += value.TotalDuration;
existingValue.TotalDuration += value.TotalDuration;
existingValue.TotalSquareDuration += value.TotalSquareDuration;
existingValue.TotalSquareDuration += value.TotalSquareDuration;
existingValue.RequestBytes += value.RequestBytes;
existingValue.RequestBytes += value.RequestBytes;
existingValue.ResponseBytes += value.ResponseBytes;
existingValue.ResponseBytes += value.ResponseBytes;
existingValue.PgSessions += value.PgSessions;
existingValue.PgSessions += value.PgSessions;
existingValue.SqlSessions += value.SqlSessions;
existingValue.SqlSessions += value.SqlSessions;
existingValue.PgStatements += value.PgStatements;
existingValue.PgStatements += value.PgStatements;
existingValue.SqlStatements += value.SqlStatements;
existingValue.SqlStatements += value.SqlStatements;
existingValue.PgEntities += value.PgEntities;
existingValue.PgEntities += value.PgEntities;
existingValue.SqlEntities += value.SqlEntities;
existingValue.SqlEntities += value.SqlEntities;
existingValue.CassandraStatements += value.CassandraStatements;
existingValue.CassandraStatements += value.CassandraStatements;
existingValue.Hits += value.Hits;
existingValue.Hits += value.Hits;
return existingValue;
return existingValue;
}
}
localConcurrentDictionary.AddOrUpdate(key, value, UpdateValueFactory);
localConcurrentDictionary.AddOrUpdate(key, value, UpdateValueFactory);
backup.Add(item);
backup.Add(item);
}
}
}
}
foreach (var pair in localConcurrentDictionary)
foreach (var pair in localConcurrentDictionary)
{
{
list.Add(new BigQueryInsertRow
{
new WatchtowerBigQueryModel().ToInsertRowApi(pair.Key, pair.Value)
list.Add(new WatchtowerBigQueryModel().ToStorageWriteApi(pair.Key, pair.Value));
});
}
}
_lastSent = DateTime.UtcNow;
_lastSent = DateTime.UtcNow;
var dropThreshold = Math.Min(MAX_BACK_PRESSURE, Settings.Config.IntFrom("OtherBigQueryStatsSaverDropThreshold"));
var dropThreshold = Math.Min(MAX_BACK_PRESSURE, Settings.Config.IntFrom("OtherBigQueryStatsSaverDropThreshold"));
// dropping the remaining queue over the max back pressure size is important to maintain the health of the cluster (e.g. memory & cpu)
// dropping the remaining queue over the max back pressure size is important to maintain the health of the cluster (e.g. memory & cpu)
// It takes around on average 647.9176121302088856 ms (n of 9431) to send a full batch of 10,000 rows to big query
// It takes around on average 647.9176121302088856 ms (n of 9431) to send a full batch of 10,000 rows to big query
// the largest dropped remaining queue we've seen is 89,611 which occurred at 2021-03-03 00:56:00.042608+00
// the largest dropped remaining queue we've seen is 89,611 which occurred at 2021-03-03 00:56:00.042608+00
// calculations are very bursty in load, we seem to do a consistent load and then have a massive spike for a short few seconds
// calculations are very bursty in load, we seem to do a consistent load and then have a massive spike for a short few seconds
// so to begin with I'm setting the max back pressure to 40,000, this is AFTER we have taken the maximum possible from the queue
// so to begin with I'm setting the max back pressure to 40,000, this is AFTER we have taken the maximum possible from the queue
// so in effect, the entire limit is 50,000
// so in effect, the entire limit is 50,000
if (_concurrentQueue.Count > dropThreshold)
if (_concurrentQueue.Count > dropThreshold)
{
{
// instead of dropping the entire queue, we now only drop the number beyond the threshold.
// instead of dropping the entire queue, we now only drop the number beyond the threshold.
// previously if the max back pressure we set to 40,000, and the back pressure had reached 41,234 we would have dropped all 41,234 items
// previously if the max back pressure we set to 40,000, and the back pressure had reached 41,234 we would have dropped all 41,234 items
// now we only drop the items past that threshold, so 1,234
// now we only drop the items past that threshold, so 1,234
var numberOfItemsToDequeue = _concurrentQueue.Count - dropThreshold;
var numberOfItemsToDequeue = _concurrentQueue.Count - dropThreshold;
_thirdPartyLogger.Error(new LogDetails($"Dropping {numberOfItemsToDequeue}")
_thirdPartyLogger.Error(new LogDetails($"Dropping {numberOfItemsToDequeue}")
{
{
ObjectDump = new
ObjectDump = new
{
{
total = _concurrentQueue.Count,
total = _concurrentQueue.Count,
dropThreshold = dropThreshold,
dropThreshold = dropThreshold,
numberOfItemsToDequeue = numberOfItemsToDequeue
numberOfItemsToDequeue = numberOfItemsToDequeue
}
}
});
});
for (var i = 0; i < numberOfItemsToDequeue; i++)
for (var i = 0; i < numberOfItemsToDequeue; i++)
{
{
_concurrentQueue.TryDequeue(out _);
_concurrentQueue.TryDequeue(out _);
}
}
}
}
// Only send if the config is enabled and we managed to connect to the BigQuery
// Only send if the config is enabled and we managed to connect to the BigQuery
// The above happens regardless to prevent a memory leak
// The above happens regardless to prevent a memory leak
if (Settings.Config.IsEnabled("UseBigQueryForWatchtower") && _bigQueryTable is object)
if (Settings.Config.IsEnabled("UseBigQueryForWatchtower") && _appendRowsStream is object)
{
{
var sw = Stopwatch.StartNew();
var sw = Stopwatch.StartNew();
try
try
{
{
// this does not use the start-stop cancellation token because we want to give big query a maximum of 5 seconds to batch insert
// this does not use the start-stop cancellation token because we want to give big query a maximum of 5 seconds to batch insert
await _bigQueryTable.InsertRowsAsync(list, null, new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token);
var protoData = new AppendRowsRequest.Types.ProtoData
{
WriterSchema = _writerSchema,
Rows = new ProtoRows
{
// https://github.com/protocolbuffers/protobuf/issues/12217 -> ToByteString memory leak
SerializedRows = { list.Select(x => x.ToByteString()) },
},
};
await _appendRowsStream.WriteAsync(new AppendRowsRequest
{
ProtoRows = protoData,
WriteStream = _writeStreamName,
});
if (Settings.Config.IsEnabled("OtherBigQueryStatsSaverLogsOn"))
if (Settings.Config.IsEnabled("OtherBigQueryStatsSaverLogsOn"))
{
{
Log.Warn(new LogDetails($"Aggregated {length} individual saves to be {localConcurrentDictionary.Count} saves")
Log.Warn(new LogDetails($"Aggregated {length} individual saves to be {localConcurrentDictionary.Count} saves")
{
{
ObjectDump = new
ObjectDump = new
{
{
count = list.Count,
count = list.Count,
duration = sw.Elapsed.TotalMilliseconds,
duration = sw.Elapsed.TotalMilliseconds,
remaining = _concurrentQueue.Count,
remaining = _concurrentQueue.Count,
beforeAggr = length,
beforeAggr = length,
afterAggr = localConcurrentDictionary.Count,
afterAggr = localConcurrentDictionary.Count,
sizeOfRequest = list.Sum(x => x.CalculateSize())
}
}
});
});
}
}
}
}
catch (TaskCanceledException exception)
catch (TaskCanceledException exception)
{
{
_thirdPartyLogger.Error(new LogDetails("It took longer than five seconds to send calculation stats to big query. Items will be re-queued")
_thirdPartyLogger.Error(new LogDetails("It took longer than five seconds to send calculation stats to big query. Items will be re-queued")
{
{
ObjectDump = new
ObjectDump = new
{
{
count = list.Count,
count = list.Count,
duration = sw.Elapsed.TotalMilliseconds,
duration = sw.Elapsed.TotalMilliseconds,
remaining = _concurrentQueue.Count,
remaining = _concurrentQueue.Count,
beforeAggr = length,
beforeAggr = length,
afterAggr = localConcurrentDictionary.Count,
afterAggr = localConcurrentDictionary.Count,
},
},
Exception = exception
Exception = exception
});
});
ReQueue(backup);
ReQueue(backup);
}
}
catch (Exception exception)
catch (Exception exception)
{
{
_thirdPartyLogger.Error(new LogDetails("A general error occurred sending calculation stats to big query. Items will be re-queued")
_thirdPartyLogger.Error(new LogDetails("A general error occurred sending calculation stats to big query. Items will be re-queued")
{
{
ObjectDump = new
ObjectDump = new
{
{
count = list.Count,
count = list.Count,
duration = sw.Elapsed.TotalMilliseconds,
duration = sw.Elapsed.TotalMilliseconds,
remaining = _concurrentQueue.Count,
remaining = _concurrentQueue.Count,
beforeAggr = length,
beforeAggr = length,
afterAggr = localConcurrentDictionary.Count,
afterAggr = localConcurrentDictionary.Count,
},
},
Exception = exception
Exception = exception
});
});
ReQueue(backup);
ReQueue(backup);
}
}
}
}
}
}