Last active
June 21, 2024 08:16
-
-
Save shakir915/7ca31774a5bb5c54f89560945112710f to your computer and use it in GitHub Desktop.
YahooSocketFetcher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package shakir.bhav.common | |
import com.google.common.primitives.Ints | |
import kotlinx.coroutines.GlobalScope | |
import kotlinx.coroutines.launch | |
import okhttp3.* | |
import proto.My // proto file : https://gist.github.com/shakir915/f2f078058183a5beb4a853d7d6529de4 | |
import java.io.File | |
import java.io.FileOutputStream | |
import java.text.SimpleDateFormat | |
import java.util.* | |
import java.util.concurrent.TimeUnit | |
import kotlin.collections.ArrayList | |
import kotlin.concurrent.thread | |
object YahooSocketFetcher { | |
var saveRaw = true | |
val rawlist1 = arrayListOf<ByteArray>() | |
val rawlist2 = arrayListOf<ByteArray>() | |
var rawAddList = rawlist1 | |
var rawSaveList = rawlist1 | |
val appendFileOnArraySize by lazy { | |
settings.appendFileOnArraySize | |
} | |
fun kill() { | |
println("YahooSocketFetcher : kill() called") | |
socketEnabled = false | |
try { | |
webSocket?.close(1000, "") | |
} catch (e: Exception) { | |
e.printStackTrace() | |
} | |
} | |
fun addSymbol(list_i: List<String>) { | |
println("YahooSocketFetcher : addSymbol() called with: list_i = $list_i") | |
if (!list.containsAll(list_i)) { | |
list.addAll(list_i) | |
list.distinct()?.let { | |
list.clear() | |
list.addAll(it) | |
} | |
sendPendingForAddNewSymbol = true | |
} | |
} | |
var inisiationOnProgress = false | |
var sendPendingForAddNewSymbol = false | |
var functions: ArrayList<((Triple<String, Double, Long>) -> Unit)> = arrayListOf() | |
fun start(list: List<String>, function: (Triple<String, Double, Long>) -> Unit) { | |
println("YahooSocketFetcher : start() called with: list = $list, function = $function") | |
socketEnabled = true | |
this.functions.add(function) | |
addSymbol(list) | |
if (webSocket != null && System.currentTimeMillis() - lastOnMessageWithPriceTriggeredAtMilli <= 10000) { | |
sendPendingForAddNewSymbol = true | |
} else { | |
restart(100) | |
} | |
} | |
fun restart(delay: Long = 3000L) { | |
println("YahooSocketFetcher : restart() called with: delay = $delay") | |
if (!inisiationOnProgress) { | |
inisiationOnProgress = true | |
try { | |
webSocket?.close(1000, "") | |
} catch (e: Exception) { | |
} | |
Thread.sleep(delay) | |
if (socketEnabled) { | |
try { | |
val client = OkHttpClient.Builder() | |
.readTimeout(0, TimeUnit.MILLISECONDS) | |
.build() | |
val request = Request.Builder() | |
.url("wss://streamer.finance.yahoo.com/") | |
.build() | |
webSocket = client.newWebSocket(request, webSocketListener) | |
} catch (e: Exception) { | |
e.printStackTrace() | |
} | |
} | |
inisiationOnProgress = false | |
} | |
} | |
var socketEnabled = true | |
var webSocket: WebSocket? = null | |
var list: ArrayList<String> = arrayListOf() | |
var lastOnMessageWithPriceTriggeredAtMilli = 0L | |
val webSocketListener = object : WebSocketListener() { | |
override fun onMessage(webSocket: WebSocket, text: String) { | |
super.onMessage(webSocket, text) | |
try { | |
//println("webSocket onMessage $webSocket $text ") | |
val ba = Base64.getDecoder().decode(text) | |
My.PricingData.parseFrom(ba)?.let { | |
lastOnMessageWithPriceTriggeredAtMilli = System.currentTimeMillis() | |
//println("${it.id} ${it.shortName} ${it.underlyingSymbol} ${it.price} ${it.dayVolume} ${milliDisplay(it.time)}") | |
functions.forEach { function -> | |
val triple = Triple(it.id.replace(".NS", ""), it.price.toDouble(), it.time) | |
IndicatorCheck.check(triple) | |
function?.invoke(triple) | |
} | |
//println("proto ${it.underlyingSymbol} ${milliFileDateTime(it.time)} ${it.shortName} ${it.ask} ${it.bid} ${it.changePercent} ${it.currency} ${it.dayVolume}") | |
} | |
saveFile(ba) | |
} catch (e: Exception) { | |
e.printStackTrace() | |
} | |
if (sendPendingForAddNewSymbol) { | |
sendPendingForAddNewSymbol = false | |
send() | |
} | |
} | |
override fun onOpen(webSocket: WebSocket, response: Response) { | |
super.onOpen(webSocket, response) | |
println("webSocket onOpen ${YahooSocketFetcher.webSocket} $response ") | |
send() | |
} | |
fun send() { | |
try { | |
var s = "{\"subscribe\":[\"" | |
var sInetialLength = s.length | |
list.filter { it.isNotBlank() == true }.forEach { | |
if (s.length > sInetialLength) { | |
s += "\",\"${it}.NS" | |
} else { | |
s += "${it}.NS" | |
} | |
} | |
s += "\"]}" | |
webSocket?.send(s) | |
} catch (e: Exception) { | |
e.printStackTrace() | |
} | |
} | |
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { | |
super.onClosed(webSocket, code, reason) | |
println("webSocket onClosed $webSocket $code $reason") | |
restart() | |
} | |
override fun onClosing( | |
webSocket: WebSocket, | |
code: Int, | |
reason: String, | |
) { | |
super.onClosing(webSocket, code, reason) | |
println("webSocket onClosing $webSocket $code $reason") | |
restart() | |
} | |
override fun onFailure( | |
webSocket: WebSocket, | |
t: Throwable, | |
response: Response?, | |
) { | |
super.onFailure(webSocket, t, response) | |
println("webSocket onFailure $webSocket $t $response") | |
restart() | |
} | |
override fun onMessage(webSocket: WebSocket, bytes: okio.ByteString) { | |
super.onMessage(webSocket, bytes) | |
//println("webSocket onMessage $webSocket $bytes") | |
} | |
} | |
fun saveFile(ba: ByteArray?, function: (() -> Unit)? = null) { | |
if (saveRaw) { | |
if (ba != null) | |
rawAddList.add(ba) | |
//println("rawAddList ${rawAddList.size}") | |
if (ba == null || rawAddList.size == appendFileOnArraySize) { | |
rawSaveList = rawAddList | |
thread { | |
getDataFolder("YahooSocketBinaryPartFiles", milliFileDateTime(System.currentTimeMillis())).apply { | |
rawSaveList.forEach { | |
appendBytes(Ints.toByteArray(it.size)) | |
appendBytes(it) | |
} | |
rawSaveList.clear() | |
} | |
function?.invoke() | |
} | |
if (rawAddList == rawlist1) { | |
rawAddList = rawlist2 | |
} else { | |
rawAddList = rawlist1 | |
} | |
} | |
} | |
} | |
fun partFileToSingleFileAndUploadToTG(logBack: ((s: String?) -> Unit)? = null) { | |
kill() | |
saveFile(null) { | |
val dates = getDataFolder("YahooSocketBinaryPartFiles").list().map { it.split("__")?.getOrNull(0) }.filterNotNull().distinct() | |
dates.forEach { date -> | |
val files = getDataFolder("YahooSocketBinaryPartFiles").listFiles().filter { it.name.startsWith(date) }.toTypedArray() | |
getDataFolder("YahooSocketBinary", "YahooSocketBinary__" + date).appendAll( | |
files = files.sortedBy { it.name }.toTypedArray() | |
) | |
files.forEach { | |
it.delete() | |
} | |
} | |
GlobalScope.launch { | |
TGUpload.upload(dates.map { getDataFolder("YahooSocketBinary", "YahooSocketBinary__" + it) }) | |
logBack?.invoke("DONE") | |
} | |
} | |
} | |
fun File.appendAll(bufferSize: Int = 4096, vararg files: File) { | |
// if (!exists()) { | |
// throw NoSuchFileException(this, null, "File doesn't exist.") | |
// } | |
require(!isDirectory) { "The file is a directory." } | |
FileOutputStream(this, true).use { output -> | |
for (file in files) { | |
if (file.isDirectory || !file.exists()) { | |
continue // Might want to log or throw | |
} | |
file.forEachBlock(bufferSize) { buffer, bytesRead -> output.write(buffer, 0, bytesRead) } | |
} | |
} | |
} | |
/* | |
fun process(text: String) { | |
val f1l = ba[1] | |
var f1v = String(Arrays.copyOfRange(ba, 2, 2 + f1l)) | |
if (f1v.contains(".NS") || true) { | |
f1v = f1v.replace(".NS", "") | |
val f2v = ByteBuffer.wrap(Arrays.copyOfRange(ba, 3 + f1l, 7 + f1l)).order(ByteOrder.LITTLE_ENDIAN).getFloat() | |
println("$f1v $f2v") | |
} | |
}*/ | |
fun testSaved() { | |
IndicatorCheck.pushEnabled=false | |
getDataFolder("YahooSocketBinary").listFiles().forEach { file -> | |
val ci = Calendar.getInstance().apply { | |
timeInMillis = SimpleDateFormat("yyyy_MM_dd", Locale.ENGLISH) | |
.parse(file.name.replace("YahooSocketBinary__", "")).time | |
set(Calendar.HOUR_OF_DAY, 5) | |
set(Calendar.MINUTE, 0) | |
set(Calendar.SECOND, 0) | |
set(Calendar.MILLISECOND, 0) | |
} | |
val ohlcp = getOHLCPivotList(ci) | |
IndicatorCheck.restart(ohlcp) | |
val sizeByteArray = ByteArray(4) | |
val arraylist= arrayListOf<Triple<String,Double,Long>>() | |
file.inputStream().buffered().use { input -> | |
while (true) { | |
var b = input.read(sizeByteArray) | |
if (b <= 0) break | |
val valueByteArray = ByteArray(Ints.fromByteArray(sizeByteArray)) | |
b = input.read(valueByteArray) | |
if (b <= 0) break | |
My.PricingData.parseFrom(valueByteArray)?.let { | |
arraylist.add(Triple(it.id.replace(".NS", ""), it.price.toDouble(), it.time)) | |
// println("PricingData ${ }.") | |
} | |
} | |
} | |
arraylist.sortedBy { it.third }.forEach { | |
IndicatorCheck.check(it) | |
} | |
} | |
IndicatorCheck.pushEnabled=true | |
} | |
fun addToMinuteCandle(pb: Triple<String, Double, Long>, o: MutableList<Float>, h: MutableList<Float>, l: MutableList<Float>, c: MutableList<Float>, v: ArrayList<Long>, t: ArrayList<Long>) { | |
val seconds_nearest_minute = Calendar.getInstance().apply { | |
timeInMillis = pb.third | |
set(Calendar.SECOND, 0) | |
set(Calendar.MILLISECOND, 0) | |
}.timeInMillis.div(1000) | |
val i = t.indexOfFirst { it == seconds_nearest_minute } | |
if (t.getOrNull(i) != null) { | |
if (pb.second > h[i]) | |
h[i] = pb.second.toFloat() | |
else if (pb.second < l[i]) | |
l[i] = pb.second.toFloat() | |
c[i] = pb.second.toFloat() | |
} else { | |
t.add(seconds_nearest_minute) | |
h.add(pb.second.toFloat()) | |
o.add(pb.second.toFloat()) | |
l.add(pb.second.toFloat()) | |
c.add(pb.second.toFloat()) | |
v.add(0) | |
} | |
} | |
fun addToDayCandle(pb: Triple<String, Double, Long>, o: MutableList<Float>, h: MutableList<Float>, l: MutableList<Float>, c: MutableList<Float>, v: ArrayList<Long>, t: ArrayList<Long>) { | |
//todo | |
val seconds_nearest_minute = Calendar.getInstance().apply { | |
timeInMillis = pb.third | |
set(Calendar.SECOND, 0) | |
set(Calendar.MILLISECOND, 0) | |
}.timeInMillis.div(1000) | |
val i = t.indexOfFirst { it == seconds_nearest_minute } | |
if (t.getOrNull(i) != null) { | |
if (pb.second > h[i]) | |
h[i] = pb.second.toFloat() | |
else if (pb.second < l[i]) | |
l[i] = pb.second.toFloat() | |
c[i] = pb.second.toFloat() | |
} else { | |
t.add(seconds_nearest_minute) | |
h.add(pb.second.toFloat()) | |
o.add(pb.second.toFloat()) | |
l.add(pb.second.toFloat()) | |
c.add(pb.second.toFloat()) | |
v.add(0) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
proto file
https://gist.github.com/shakir915/f2f078058183a5beb4a853d7d6529de4