Lighting a Spark

Audience : Anyone who wants to set up a spark cluster. \

Pre-requisite: Knowledge of docker, docker-compose, pyspark

Setting up a Spark Standalone Cluster

In order to set this up we require a few ingredients namely - Java, Spark and python for running pyspark applications. We will set up one master node and 3 workers, although you can scale up or down as you see fit. Both the master and worker will come from the same image, just the entrypoint will be different. Your Java installation needs to be 1.8 as that is what Spark 2.4.x runs on. To set up my dockerfile I start of with a base alpine java 8 image. While this is good for standalone, you might prefer to use ubuntu as I have run into some issues (none that cannot be fixed) when trying to set up a yarn cluster using the same image. So in your project directory create a docker file and then add the following lines to it

FROM openjdk:8-alpine

USER root

# wget, tar, bash for getting spark and hadoop
RUN apk --update add wget tar bash

This will first use the alpine image as our base, make root as the user and download the requisite packages for what’s coming next which is downloading the Spark tar and unpacking it in a directory. I am using /spark as my spark folder. I will then add this to my PATH variable and also declare SPARK_HOME as another environment variable that points to /spark. The last step isn’t necessary from what I have seen so far as spark is smart enough to find the path if the variable isn’t declared but it doesn’t hurt.

RUN tar -xzf spark-2.4.5-bin-hadoop2.7.tgz && \
    mv spark-2.4.5-bin-hadoop2.7 /spark && \
    rm spark-2.4.5-bin-hadoop2.7.tgz

# add to PATH
ENV PATH $PATH:/spark/bin:/spark/sbin

ENV SPARK_HOME /spark

Now we are set up with bot Java and Spark on our system. But in order to use PySpark we need to now install python which the following piece of code accomplishes.

# Install components for Python
RUN apk add --no-cache --update \
    git \
    libffi-dev \
    openssl-dev \
    zlib-dev \
    bzip2-dev \
    readline-dev \
    sqlite-dev \
    musl \
    libc6-compat \
    linux-headers \
    build-base \
    procps 

# Set Python version
ARG PYTHON_VERSION='3.7.6'
# Set pyenv home
ARG PYENV_HOME=/root/.pyenv

# Install pyenv, then install python version
RUN git clone --depth 1 https://github.com/pyenv/pyenv.git $PYENV_HOME && \
    rm -rfv $PYENV_HOME/.git

ENV PATH $PYENV_HOME/shims:$PYENV_HOME/bin:$PATH

RUN pyenv install $PYTHON_VERSION
RUN pyenv global $PYTHON_VERSION
RUN pip install --upgrade pip && pyenv rehash

# Clean
RUN rm -rf ~/.cache/pip

What we do in the lines above is to first get all the required packages needed to install python. Git for cloning the python directory, zlib-dev, bzip2-dev are compression libraries required for the install and so on and so forth. I landed on the required list through some trial and error during installation.

The last step in the dockerfile is embedding the shell scripts that will act as an entry-point to the docker containers. The entrypoint is just the command that starts the spark master or worker node as a daemon. So let’s first create another directory called scripts in our project directory and add two shell files under it namely - run_master.sh and run_worker.sh

run_master has the command to start the spark master node

Different ways of Fibonacci generation using Python

One of the most oft-cited coding questions especially in internship interviews is for the Fibonacci sequence. Here i provide different type of ways to generate Fibonacci numbers in Python including a generator


def fib_r(n):
        #print n
        if (n == 0): return(0)
        if (n == 1): return(1)
        return(fib_r(n-1) + fib_r(n-2))
        
    def fibBinet(n):
        phi = (1 + 5**0.5)/2.0
        return int(round((phi**n - (1-phi)**n) / 5**0.5))
        
    def fib_dp(n):
        l = [0,1]
        for i in range(2,n+1):
            l.append(l[i-1]+l[i-2])
        return l[n]
        
    def fib_ultimate(n):
        if n==0: return 0
        a,b = 0,1
        for i in range(n-1):
            a,b = b,a+b
        return b
        
    def fib_gen():
        a, b = 0, 1
        while True:            # First iteration:
            yield a            # yield 0 to start with and then
            a, b = b, a + b    # a will now be 1, and b will also be 1, (0 + 1)


Using Bootstrap pagination to page through divs on same page

I have been playing around with Bootstrap framework and its a boon in terms of layout and providing all the tools to layout your CSS without much work. It provides a simple pagination  but most of the tutorials online including Bootstrap documentation focus on the layout of the HTML rather than using it to provide functionality. I am using Bootstrap pagination to peruse contents on the same page which contains multiple divs. An active page is actually an active div which I swap out when moving to next page. So in a way it acts like a carousel (which is what I looked at initially). So without further ado, here's the HTML to setup the pagination:

<div class="pagination-container" >
   <div data-page="1" >
      <p>Content for Div Number 1</p>
   </div>
   <div data-page="2" style="display:none;">
      <p>Content for Div Number 2</p>
   </div>
   <div data-page="3" style="display:none;">
      <p>Content for Div Number 3</p>
   </div>
   <div data-page="4" style="display:none;">
      <p>Content for Div Number 4</p>
   </div>
   <div data-page="5" style="display:none;">
      <p>Content for Div Number 5</p>
   </div>

   <div class="text-center">
   <div class="pagination pagination-centered">
       <ul class="pagination ">
            <li data-page="-" ><a href="#" >&lt;</a></li>
            <li data-page="1"><a href="#" >1</a></li>
            <li data-page="2"><a href="#" >2</a></li>
            <li data-page="3"><a href="#" >3</a></li>
            <li data-page="4"><a href="#" >4</a></li>
            <li data-page="5"><a href="#" >5</a></li>
            <li data-page="+"><a href="#" >&gt;</a></li>
      </ul>
   </div></div></div>

And here's the javascript for swapping out the div. If you wish to utilize the Bootstrap styling then make sure to have the page structured as provided above. The ul class needs to named pagination and it needs to be enclosed in a div with class "text-center" which helps with centering the page controls

<script>
var paginationHandler = function(){
    // store pagination container so we only select it once
    var $paginationContainer = $(".pagination-container"),
        $pagination = $paginationContainer.find('.pagination ul');
    // click event
    $pagination.find("li a").on('click.pageChange',function(e){
        e.preventDefault();
        // get parent li's data-page attribute and current page
    var parentLiPage = $(this).parent('li').data("page"),
    currentPage = parseInt( $(".pagination-container div[data-page]:visible").data('page') ),
    numPages = $paginationContainer.find("div[data-page]").length;
    // make sure they aren't clicking the current page
    if ( parseInt(parentLiPage) !== parseInt(currentPage) ) {
    // hide the current page
    $paginationContainer.find("div[data-page]:visible").hide();
    if ( parentLiPage === '+' ) {
                // next page
        $paginationContainer.find("div[data-page="+( currentPage+1>numPages ? numPages : currentPage+1 )+"]").show();
    } else if ( parentLiPage === '-' ) {
                // previous page
        $paginationContainer.find("div[data-page="+( currentPage-1<1 ? 1 : currentPage-1 )+"]").show();
    } else {
        // specific page
        $paginationContainer.find("div[data-page="+parseInt(parentLiPage)+"]").show();
            }
        }
    });
};
$( document ).ready( paginationHandler );
</script>

Graph class

Here's a simple Graph data structure that builds upon the previous Vertex data structure. The graph maintains an inner dictionary of all the vertices in it and a count of the vertices. There are functions to create vertex and add edges between two vertices that internally call Vertex member functions

Header


#ifndef __graphs__graph__
#define __graphs__graph__

#include 
#include 
#include 
#include "vertex.h"

class Graph {
    std::map> _vertDict;
    int _numVertices;
    
public:
    Graph(){};
    std::vector getVertices();
    void addVertex(char);
    std::shared_ptr getvertex(char);
    void addEdge(char,char,int);
    int getWeight(char,char);
};

#endif /* defined(__graphs__graph__) */

Source


#include "graph.h"
void Graph::addVertex(char id){
    std::shared_ptr pv = std::make_shared(id);
    _vertDict.insert(std::map>::value_type(id, pv));
    _numVertices++;
}

void Graph::addEdge(char id1, char id2, int weight){
    std::map>::iterator it1 = _vertDict.find(id1);
    std::map>::iterator it2 = _vertDict.find(id2);
    if (it1 == _vertDict.end() || it2 == _vertDict.end()) {
        return;
    }else{
        it1->second->addNeighbor(weight, it2->second);
    }
}

std::shared_ptr Graph::getvertex(char id){
    std::map>::iterator it = _vertDict.find(id);
    if (it != _vertDict.end()) {
        return  it->second;
    }else{
        return nullptr;
    }
}

std::vector Graph::getVertices(){
    std::vector ids;
    for (std::map>::iterator iter = _vertDict.begin(); iter != _vertDict.end(); ++iter){
        ids.push_back(iter->first);
    }
    return ids;
}

int Graph::getWeight(char id1, char id2){
    std::map>::iterator it1 = _vertDict.find(id1);
    std::map>::iterator it2 = _vertDict.find(id2);
    if (it1 != _vertDict.end() && it2 != _vertDict.end()) {
        return it1->second->getWeight(*it2->second);
    }else{
        return -1;
    }
}

Test


#include 
#include "vertex.h"
#include "graph.h"

int main(int argc, const char * argv[]) {    
    Graph g;
    g.addVertex('a');
    g.addVertex('b');
    g.addVertex('c');
    g.addEdge('a', 'b', 9);
    g.addEdge('a', 'c', 5);
    for (auto i: g.getVertices())
        std::cout << i <<'\n';
    std::shared_ptr v1;
    v1 = g.getvertex('a');
    for (auto i: v1->getConnections())
        std::cout << i.second << ':'<< i.first <<'\n';
    std::cout<< g.getWeight('a', 'b')<

Print all non-unique items in a python list preserving the order

This question was asked to me in an interview and actually took me quite a while to figure out since I wasn't really familiar with awesomeness that is python. Here is a one line implementation. Basically, filter out all the values whose count is greater than one

# print the number in a list if its non unique
# eg l = [1,2,3,4,4,4,5,1,2,7,8,8,10] will give [1, 2, 4, 4, 4, 1, 2, 8, 8]
def printNonUnique(l):
    return filter( lambda x: x if l.count(x)>1 else None,l)


Simple Vertex Class

I am beginning to get back into C++ in my downtime. This is a simple Vertex class that has two members: an id, and adjacency list. The list is a multi map that contains references to the Vertex's neighbors and the weight attached to the path from the Vertex to its neighbors. The references are shared pointers since I didn't wish to deal with memory management.

Vertex header class

#ifndef __graphs__vertex__
#define __graphs__vertex__

#include <stdio.h>
#include <map>
#include <vector>

class Vertex {
    char _id;
    std::multimap<int, std::shared_ptr<Vertex>> _adjList;
    
public:
    void addNeighbor(int weight,std::shared_ptr<Vertex> neighbor);
    std::vector<std::pair<int,char>> getConnections();
    char getId();
    int getWeight(Vertex);
    Vertex(char);
    friend bool operator== (Vertex & lhs, Vertex & rhs );
    
};
#endif /* defined(__graphs__vertex__) */

Vertex definition class

#include "vertex.h"

Vertex::Vertex(char id){
    _id = id;
}

void Vertex::addNeighbor(int weight,std::shared_ptr<Vertex> neighbor){
    _adjList.insert(std::multimap<int, std::shared_ptr<Vertex>>::value_type(weight, neighbor));
}

std::vector<std::pair<int,char>> Vertex::getConnections(){
    std::vector<std::pair<int,char>> ids;
    for(std::multimap<int, std::shared_ptr<Vertex>>::iterator it = _adjList.begin(); it != _adjList.end(); ++it) {
        ids.push_back(std::make_pair(it->first, it->second->_id));
    };
    return ids;
}

char Vertex::getId(){
    return _id;
}

int Vertex::getWeight(Vertex v){
    for(std::multimap<int, std::shared_ptr<Vertex>>::iterator it = _adjList.begin(); it != _adjList.end(); ++it) {
        if (*it->second==v) {
            return it->first;
        };
    };
    return -1;
}

bool operator== (Vertex & lhs, Vertex & rhs ){
    return (lhs.getId() == rhs.getId()) ;
}

Main file for example

#include <iostream>
#include "vertex.h"

int main(int argc, const char * argv[]) {
    Vertex v1('a');
    v1.addNeighbor(9, std::make_shared<Vertex>('b'));
    v1.addNeighbor(6, std::make_shared<Vertex>('c'));
    for (auto i: v1.getConnections())
        std::cout << i.second << ':'<< i.first <<'\n';
    return 0;
}



Calculating BS European Call & Put prices

A simple C++ script to calculate Black-Scholes European Call and Put prices.
The inputs are Option price, Risk-free rate (Percentages must be provided as decimal), Volatility of the underlying (Percentages must be provided as decimal), Time of maturity (Must be in years), Start time.
Output is the calculated Call or Put price

#include "stdafx.h"

namespace BS{

    double getCDF(double x){

        double b0 = 0.2316419;
        double b1 = 0.319381530;
        double b2 = -0.356563782;
        double b3 = 1.781477937;
        double b4 = -1.821255978;
        double b5 = 1.330274429;
        double t = 1 / (1 + b0*x);

        double prod = b1*t + b2*pow(t, 2) + b3*pow(t, 3) + b4*pow(t, 4) + b5*pow(t, 5);

        if (x >= 0.0) {
            return (1 - stdnorm(x)*prod);
        }
        else {
            return 1.0 - getCDF(-x);
        }

    }

    double stdnorm(double x) {

        return (1.0 / (pow(2 * M_PI, 0.5)))*exp(-0.5*x*x);
    }


    BSEur::BSEur() :
        r(0.1), v(0.25), K(100), T(1.0), S(100), isCall(true){}

    BSEur::BSEur(double r, double v, double K, double T, double S, bool isCall){

        this->r = r;
        this->v = v;
        this->K = K;
        this->T = T;
        this->S = S;
        this->isCall = isCall;
    }

    BSEur::~BSEur(){}

    double BSEur::getd(){
        return (log(S / K) + (r + pow(v, 2)*0.5)*(T)) / (v*sqrt(T));
    }

    double BSEur::getCallPrice(){
        double d1 = getd();
        double d2 = d1 - v*sqrt(T);

        /*printf("d1: %lf\n", d1);
		printf("d2: %lf\n", d2);*/

        return (S*getCDF(d1)) - (K*exp(-r*(T))*getCDF(d2));
    }

    double BSEur::getPutPrice(){
        double d1 = getd();
        double d2 = d1 - v*sqrt(T);

        return (K*exp(-r*(T))*getCDF(-d2)) - (S*getCDF(d1));
    }


    void BSEur::printPrice(){
        if (isCall)
        {
            cout << "Price of option: " << getCallPrice()<<endl;
        }
        else
        {
            cout << "Price of option: " <<  getPutPrice()<<endl;
        }
    }
}

Calculating Greeks in C

In quantitative finance, the Greeks are the quantities representing the sensitivity of the price of derivatives such as options to a change in underlying parameters on which the value of an instrument or portfolio of financial instruments is dependent. The name is used because the most common of these sensitivities are often denoted by Greek letters. There are plenty of online resources for understanding the underlying mathematical formulas but here's a quick script

//
//  main.c
//  stat598a1
//
//  Created by Fizi Yadav on 1/20/14.
//  Copyright (c) 2014 Fizi Yadav. All rights reserved.
//

#include <stdio.h>
#include <math.h>


double getCDF(double);
double stdnorm(double);
double getd(double S, double K, double r, double v, double T, double t);
double getCallPrice(double S, double K, double r, double v, double T,double t);
double getDelta(double S, double K, double r, double v, double T, double t);
double getGamma(double S, double K, double r, double v, double T, double t);
double getVega(double S, double K, double r, double v, double T, double t);
double getTheta(double S, double K, double r, double v, double T, double t);
double getRho(double S, double K, double r, double v, double T, double t);
void printGreeks(double S, double K, double r, double v, double T, double t);


int main(int argc, const char * argv[])
{
    
    double S = 40.0;   // Option price
    double K = 45.0;   // Strike price
    double r = 0.08;    // Risk-free rate. Percentages must be provided as decimal eg: (5%) as (0.05)
    double v = 0.05;    // Volatility of the underlying. Percentages must be provided as decimal
    double T = 3.0;     // Time of maturity. Must be in years
    double t = 0.0;     // Start time
    
    printf("\nParameters:\n");
    printf("Underlying Asset Price: %lf\n", S);
    printf("Strike Price %lf\n", K);
    printf("Risk-Free Rate: %lf\n", r);
    printf("Volatility: %lf\n", v);
    printf("Time to maturity: %lf\n", T-t);
    
    printf("\nCall Price: %lf\n", getCallPrice(S, K, r, v, T, t));
    
    printGreeks(S, K, r, v, T, t);

    return 0;
}


//calculate normal CDF given x

double getCDF(double x){
    
    double b0 = 0.2316419;
    double b1 = 0.319381530;
    double b2 = -0.356563782;
    double b3 = 1.781477937;
    double b4 = -1.821255978;
    double b5 = 1.330274429;
    double t = 1/(1+b0*x);
    //double stdnorm = 0.398942*pow(2.71828, -0.5*pow(x, 2));
    
    double prod = b1*t+b2*pow(t,2)+b3*pow(t,3)+b4*pow(t,4)+b5*pow(t,5);
    
    if (x >= 0.0) {
        return (1-stdnorm(x)*prod);
    } else {
        return 1.0 - getCDF(-x);
    }
    
}

//calculate call price of a european option

double getCallPrice(double S, double K, double r, double v, double T, double t){
    
    double d1=getd(S, K, r, v, T, t);
    double d2 = d1 - v*sqrt(T-t);
    
    printf("d1: %lf\n", d1);
    printf("d2: %lf\n", d2);
    
    return (S*getCDF(d1))-(K*exp(-r*(T-t))*getCDF(d2));
    
}

//print the greeks of a european call option

void printGreeks(double S, double K, double r, double v, double T, double t){
    
    /* some sensitivities are quoted in scaled-down terms, to match the scale of likely changes in the parameters. Rho is reported divided by 100 , vega by 100 (1 vol point change), and theta by 365 or 252 (1 day decay based on either calendar days or trading days per year). */
    
    printf("\nGreeks:\n");
    
    printf("Delta: %lf\n", getDelta(S, K, r, v, T,t));
    printf("Gamma: %lf\n", getGamma(S, K, r, v, T,t));
    printf("Theta: %lf\n", getTheta(S, K, r, v, T,t)/365.0);
    printf("Vega: %lf\n", getVega(S, K, r, v, T,t)/100.0);
    printf("Rho: %lf\n", getRho(S, K, r, v, T,t)/100.0);
    
}


// Standard normal probability density function
double stdnorm(double x) {
    
    return (1.0/(pow(2*M_PI,0.5)))*exp(-0.5*x*x);
}

double getd(double S, double K, double r, double v, double T, double t){
    
    return (log(S/K)+(r+pow(v,2)*0.5)*(T-t))/(v*sqrt(T-t));
    
}



// Calculate the European call Delta

double getDelta(double S, double K, double r, double v, double T, double t) {
    return getCDF(getd(S, K, r, v, T, t));
}

// Calculate the European call Gamma

double getGamma(double S, double K, double r, double v, double T, double t) {
    return stdnorm(getd(S, K, r, v, T, t))/(S*v*sqrt(T-t));
}

// Calculate the European call Vega

double getVega(double S, double K, double r, double v, double T, double t) {
    return S*stdnorm(getd(S, K, r, v, T, t))*sqrt(T-t);
}

// Calculate the European call Theta

double getTheta(double S, double K, double r, double v, double T, double t) {
    
    double d1=getd(S, K, r, v, T, t);
    double d2 = d1 - v*sqrt(T-t);
    
    return (-(S*stdnorm(d1)*v)/(2*sqrt(T-t)))
    - (r*K*exp(-r*(T-t))*getCDF(d2));
}

// Calculate the European call Rho

double getRho(double S, double K, double r, double v, double T, double t) {
    
    return K*(T-t)*exp(-r*(T-t))*getCDF(getd(S, K, r, v, T, t)-(v*sqrt(T-t)));
}