Wednesday, April 11, 2012

Using the Instagram API from a Python Flask App

Instagram came out with their Android app last week, and finally I had a solution to the age-old problem: all my home-cooked, healthy meals look like crap when photographed with my Android Nexus S camera, and so my EatDifferent stream was not so enticing. Thanks to the Instagram filters, I can now take photos of my meals that actually look somewhat palatable. I could have just taken photos with the Instagram app and uploaded them via the EatDifferent mobile app, but since Instagram offers an API, I wanted to see if I could use the API to automatically import photos from Instagram into EatDifferent. Well, as it turns out, I could, and it was a fairly easy feat, thanks in large part to the Python Instagram API wrapper. Here's a rundown of how it works.

Authenticating Users

Instagram uses OAuth2 for authentication, which means that my app needs a flow which redirect users to Instagram, gets a token from them, upgrades that to an access token, and then saves that access token for any time it wants to make authenticated requests on behalf of the user.

On the settings page, users click on a button that hits this view and redirects them to Instagram:

@app.route('/authorize-instagram')
def authorize_instagram():
    from instagram import client

    redirect_uri = (util.get_host() + url_for('handle_instagram_authorization'))
    instagram_client = client.InstagramAPI(client_id=INSTAGRAM_CLIENT, client_secret=INSTAGRAM_SECRET, redirect_uri=redirect_uri)
    return redirect(instagram_client.get_authorize_url(scope=['basic']))

Then, when Instagram redirects back to my app, it hits this view which upgrades to an access token and saves it:

@app.route('/handle-instagram-authorization')
def handle_instagram_authorization():
    from instagram import client

    code = request.values.get('code')
    if not code:
        return error_response('Missing code')
    try:
        redirect_uri = (util.get_host() + url_for('handle_instagram_authorization'))
        instagram_client = client.InstagramAPI(client_id=INSTAGRAM_CLIENT, client_secret=INSTAGRAM_SECRET, redirect_uri=redirect_uri)
        access_token, instagram_user = instagram_client.exchange_code_for_access_token(code)
        if not access_token:
            return error_response('Could not get access token')
        g.user.instagram_userid = instagram_user['id']
        g.user.instagram_auth   = access_token
        g.user.save()
        deferred.defer(fetch_instagram_for_user, g.user.get_id(), count=20, _queue='instagram')
    except Exception, e:
        return error_response('Error')
    return redirect(url_for('settings_data') + '?after_instagram_auth=True')

Parsing Posts

As you might notice in the above code, I call a method to fetch the user's latest Instagram posts after I've saved their authentication information. I defer that method using App Engine task queues, since it could take some time, and I don't need to do that while the user is waiting.

In the code to fetch the posts, I only process posts which are tagged with "ED" or "eatdifferent", since there may be times when a user wants to post something other than meals. I also check in memcache if I've already seen this update before processing it. I could also store a max ID seen for each user and do it that way, but given the small number of posts I'm processing on average, I went with the solution which uses more memcache hits but is also more straightforward.

def fetch_instagram_for_user(user_id, count=3):
    from instagram import client

    user = models.User.get_by_id(user_id)
    if not user.instagram_auth or not user.instagram_userid:
        return

    instagram_client = client.InstagramAPI(access_token=user.instagram_auth)
    recent_media, next = instagram_client.user_recent_media(user_id=user.instagram_userid, count=count)
    for media in recent_media:
        tags = []
        for tag in media.tags:
            tags.append(tag.name.lower())
        if not ('eatdifferent' in tags or 'ed' in tags):
            continue
        cache_key = 'instagram-%s-%s' % (user.get_id(), media.id)
        if util.get_from_cache(cache_key) and False:
            continue
        imports.import_instagram(user, media)
        util.put_in_cache(cache_key, 'true')

Subscribing to Posts

Now, I want to know whenever an authenticated user updates a new photo, so that I can import it if it's tagged appropriately. The Instagram API uses parts of the PubSubHubBub protocol to let you subscribe to real-time updates. You can subscribe to all posts with particular tags, but you can also subscribe to all posts by your app's authenticated users, and in my case, that's the lower noise option. (There's a surprising number of folks using the tag "#ED", presumably tagging everyone they know named "Ed").

I only had to setup the subscription once (well, once for the test server and once for deployed), using this code:

    instagram_client = client.InstagramAPI(client_id=INSTAGRAM_CLIENT, client_secret=INSTAGRAM_SECRET)
    callback_url = 'http://www.eatdifferent.com/hook/parse-instagram'
    instagram_client.create_subscription(object='user', aspect='media', callback_url=callback_url)

When my app gets hit at the callback URL, it goes to this view which either responds to the hub challenge (if it's the first time Instagram is hitting the callback URL, to verify the subscription) or if it's an actual update, it verifies its from Instagram and calls another function to parse the update.

@app.route('/hook/parse-instagram')
def parse_instagram():
    from instagram import client, subscriptions

    mode         = request.values.get('hub.mode')
    challenge    = request.values.get('hub.challenge')
    verify_token = request.values.get('hub.verify_token')
    if challenge: 
        return Response(challenge)
    else:
        reactor = subscriptions.SubscriptionsReactor()
        reactor.register_callback(subscriptions.SubscriptionType.USER, parse_instagram_update)

        x_hub_signature = request.headers.get('X-Hub-Signature')
        raw_response    = request.data
        try:
            reactor.process(INSTAGRAM_SECRET, raw_response, x_hub_signature)
        except subscriptions.SubscriptionVerifyError:
            logging.error('Instagram signature mismatch')
    return Response('Parsed instagram')

In this function, I extract the Instagram user ID from the update JSON, find the user(s) that connected with that ID, and once again, set up a deferred task to fetch their updates. I also set a countdown of 2 minutes for that task, as I saw issues where the update would exist in the Instagram but wouldn't have all the data yet (like the tags), maybe a stale data propagation issue on their side.

def parse_instagram_update(update):
    instagram_userid = update['object_id']
    users = models.User.all().filter('instagram_userid =', instagram_userid).fetch(10)
    if len(users) == 0:
        logging.info('Didnt find matching users for this update')
    for user in users:
        deferred.defer(fetch_instagram_for_user, user.get_id(), _queue='instagram', _countdown=120)

And that's pretty much it- it's a fun app and a fun API. Hopefully they both stick around after their Facebook acquisition this week. ☺

No comments: