Last active
April 17, 2016 14:37
-
-
Save peko/605238bd08f6d39304fc9d62b26be73f to your computer and use it in GitHub Desktop.
Hashtag aggregator for instagram, vk & twitter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pg = require "pg" | |
hag = require "./hashtag-aggregator" | |
postgres = "postgres://" | |
cn = postgres | |
pc = new pg.Client cn | |
await pc.connect defer err | |
if err | |
console.error 'could not connect to postgres', err | |
process.exit 1 | |
console.log 'Connected to postgres' | |
hashtag = "" | |
cache_url = "" | |
cache_path = "" | |
insta = new hag.Instagram pc, | |
name : "insta_test" | |
hashtag : hashtag | |
cache_url : cache_url | |
cache_path: cache_path | |
instagram_cid: "" | |
instagram_sec: "" | |
vk = new hag.Vkontakte pc, | |
name : "vk_test" | |
hashtag : hashtag | |
cache_url : cache_url | |
cache_path: cache_path | |
twitter = new hag.Twitter pc, | |
name : "twitter_test" | |
hashtag : hashtag | |
cache_url : cache_url | |
cache_path: cache_path | |
oauth: | |
consumer_key : '' | |
consumer_secret : '' | |
access_token_key : '' | |
access_token_secret: '' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mkdirp = require "mkdirp" | |
colors = require "colors" | |
query = require "querystring" | |
request = require "request" | |
md5 = require "md5" | |
fs = require "fs" | |
https = require "https" | |
http = require "http" | |
TS = "TIMESTAMP WITH TIME ZONE 'epoch' + INTERVAL '1 second'" | |
class HashtagAggregator | |
new_posts: [] | |
totalPostCounter:0 | |
constructor:(@pc, @cfg)-> | |
@cfg.name ?="dummy" | |
@cfg.interval ?= 5000 | |
@cfg.cache_path ?= "./cache" | |
@cfg.cache_mem ?= 1000 | |
@cfg.cache_url ?= "http://localhost:8000" | |
mkdirp.sync "#{@cfg.cache_path}/#{@cfg.name}" | |
@posts = {} | |
@queue = [] | |
do @createScheme | |
@iid = setInterval @iterate, @cfg.interval | |
createScheme:=> | |
# DROP TABLE IF EXISTS #{@cfg.name} CASCADE; | |
qry = | |
""" | |
CREATE TABLE IF NOT EXISTS #{@cfg.name} ( | |
id SERIAL, | |
pid character varying, | |
owner character varying, | |
text character varying, | |
cache character varying, | |
signed character varying, | |
image character varying, | |
hashtag character varying, | |
utime timestamp(6) without time zone, | |
moderated boolean DEFAULT false, | |
posted boolean DEFAULT false, | |
CONSTRAINT #{@cfg.name}_pid UNIQUE(pid) | |
); | |
""" | |
await @pc.query qry, defer(err, data) | |
if err | |
console.log err | |
process.exit 1 | |
storePost:(post)=> | |
console.error "--- #{post.pid} ---".blue | |
console.log "owner: ", post.owner | |
console.log "text: ", post.text[..140] | |
console.log "image: ", post.image | |
console.log "cache: ", post.cache | |
console.log "hashtag: ", post.hashtag | |
console.log "utime: ", post.utime | |
qry = """ | |
INSERT INTO #{@cfg.name} | |
( pid, owner, text, image, cache, hashtag, utime ) | |
VALUES ( $1, $2, $3, $4, $5, $6, #{TS} * $7 ); | |
""" | |
params = [ | |
post.pid | |
post.owner | |
post.text | |
post.image | |
post.cache | |
post.hashtag | |
post.utime | |
] | |
@pc.query qry, params, (err, data)->console.error err if err | |
stop:=> | |
clearInterval @iid | |
iterate:=> | |
await @grab defer data | |
try | |
@extract JSON.parse data | |
catch err | |
return console.error err | |
@newPostCounter = 0 | |
console.error "=== #{@cfg.name} ===".green | |
@processNewPosts() | |
console.error "=== #{@cfg.name} #{@totalPostCounter}+#{@newPostCounter} ===".green | |
@totalPostCounter+= @newPostCounter | |
download: (url, cache)=> | |
file = fs.createWriteStream "#{@cfg.cache_path}/#{@cfg.name}/#{cache}" | |
if url.indexOf("https://") is 0 | |
https.get url, (res)->res.pipe file | |
else | |
http.get url, (res)->res.pipe file | |
processNewPosts:=> | |
while post = @new_posts.shift() | |
until post.pid of @posts | |
if post.image isnt null | |
if (qpos=post.image.indexOf "?") > 0 | |
post.image = post.image[..qpos-1] | |
cache = "#{md5 post.image}.jpg" | |
@download post.image, cache | |
post.cache = "#{@cfg.cache_url}/#{@cfg.name}/#{cache}" | |
else | |
post.cache = null | |
post.hashtag = @cfg.hashtag | |
@storePost post | |
@newPostCounter++ | |
@posts[post.pid] = post | |
@queue.push post.pid | |
delete @posts[@queue.shift()] if @queue.length > @cfg.cache_mem | |
console.error "queue: #{@queue.length}" | |
# INTERFACE | |
grab:(cb)=> console.error "GRAB NOT IMPLEMENTED".red | |
extract:(json)=> console.error "extract NOT IMPLEMENTED".red | |
### | |
INSTAGRAM IMLEMENTATION | |
### | |
class Instagram extends HashtagAggregator | |
grab:(cb)=> | |
params = | |
client_id : @cfg.instagram_cid | |
client_secret: @cfg.instagram_sec | |
min_tag_id : 0 | |
url = "https://api.instagram.com/v1/tags/#{@cfg.hashtag}/media/recent?#{query.stringify params}" | |
await request url, defer(err, res, body) | |
cb body | |
extract:(json)=> | |
for post in json.data | |
p = | |
pid : post.id | |
text : post.caption.text | |
owner: post.user.username | |
image: post.images.standard_resolution.url | |
utime: post.created_time | |
@new_posts.push p | |
### | |
VKONTAKTE IMLEMENTATION | |
### | |
class Vkontakte extends HashtagAggregator | |
grab:(cb)=> | |
params = | |
v: "5.50" | |
q: "##{@cfg.hashtag}" | |
count:200 | |
url = "https://api.vk.com/method/newsfeed.search?#{query.stringify(params)}" | |
await request url, defer(err, res, body) | |
cb body | |
extract:(json)=> | |
for post in json.response.items | |
image=undefined | |
if post.attachments | |
for attach, j in post.attachments | |
if attach.type is "photo" | |
image?=attach.photo.photo_1280 | |
image?=attach.photo.photo_807 | |
image?=attach.photo.photo_604 | |
image?=attach.photo.photo_130 | |
break | |
image?=null | |
p = | |
pid : "#{post.owner_id}_#{post.id}" | |
text : post.text | |
owner: post.owner_id | |
image: image | |
utime: post.date | |
@new_posts.push p | |
### | |
ТWITTER IMLEMENTATION | |
### | |
class Twitter extends HashtagAggregator | |
grab:(cb)=> | |
params = | |
q: "##{@cfg.hashtag}" | |
url = "https://api.twitter.com/1.1/search/tweets.json?#{query.stringify(params)}" | |
await request {url:url, oauth:@cfg.oauth}, defer(err, res, body) | |
cb body | |
extract:(json)=> | |
for post in json.statuses | |
image=undefined | |
if post.entities?.media? | |
for m in post.entities.media | |
if m.type is "photo" | |
image = m.media_url_https | |
break | |
image?=null | |
p = | |
pid : post.id_str | |
text : post.text | |
owner: post.user.screen_name | |
image: image | |
utime: "#{Date.parse(post.created_at)}"[..-4] | |
@new_posts.push p | |
module.exports = | |
Instagram: Instagram | |
Vkontakte: Vkontakte | |
Twitter : Twitter |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment